diff options
Diffstat (limited to 'Doc/includes/mp_pool.py')
-rw-r--r-- | Doc/includes/mp_pool.py | 347 |
1 files changed, 254 insertions, 93 deletions
diff --git a/Doc/includes/mp_pool.py b/Doc/includes/mp_pool.py index 11101e1..0a3d92a 100644 --- a/Doc/includes/mp_pool.py +++ b/Doc/includes/mp_pool.py @@ -1,3 +1,10 @@ +# +# A test of `multiprocessing.Pool` class +# +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# + import multiprocessing import time import random @@ -18,18 +25,18 @@ def calculatestar(args): return calculate(*args) def mul(a, b): - time.sleep(0.5 * random.random()) + time.sleep(0.5*random.random()) return a * b def plus(a, b): - time.sleep(0.5 * random.random()) + time.sleep(0.5*random.random()) return a + b def f(x): - return 1.0 / (x - 5.0) + return 1.0 / (x-5.0) def pow3(x): - return x ** 3 + return x**3 def noop(x): pass @@ -39,115 +46,269 @@ def noop(x): # def test(): + print 'cpu_count() = %d\n' % multiprocessing.cpu_count() + + # + # Create pool + # + PROCESSES = 4 - print('Creating pool with %d processes\n' % PROCESSES) + print 'Creating pool with %d processes\n' % PROCESSES + pool = multiprocessing.Pool(PROCESSES) + print 'pool = %s' % pool + print + + # + # Tests + # + + TASKS = [(mul, (i, 7)) for i in range(10)] + \ + [(plus, (i, 8)) for i in range(10)] + + results = [pool.apply_async(calculate, t) for t in TASKS] + imap_it = pool.imap(calculatestar, TASKS) + imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) + + print 'Ordered results using pool.apply_async():' + for r in results: + print '\t', r.get() + print + + print 'Ordered results using pool.imap():' + for x in imap_it: + print '\t', x + print + + print 'Unordered results using pool.imap_unordered():' + for x in imap_unordered_it: + print '\t', x + print + + print 'Ordered results using pool.map() --- will block till complete:' + for x in pool.map(calculatestar, TASKS): + print '\t', x + print + + # + # Simple benchmarks + # + + N = 100000 + print 'def pow3(x): return x**3' - with multiprocessing.Pool(PROCESSES) as pool: - # - # Tests - # + t = time.time() + A = map(pow3, xrange(N)) + print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \ + (N, time.time() - t) - TASKS = [(mul, (i, 7)) for i in range(10)] + \ - [(plus, (i, 8)) for i in range(10)] + t = time.time() + B = pool.map(pow3, xrange(N)) + print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \ + (N, time.time() - t) - results = [pool.apply_async(calculate, t) for t in TASKS] - imap_it = pool.imap(calculatestar, TASKS) - imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) + t = time.time() + C = list(pool.imap(pow3, xrange(N), chunksize=N//8)) + print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \ + ' seconds' % (N, N//8, time.time() - t) - print('Ordered results using pool.apply_async():') - for r in results: - print('\t', r.get()) - print() + assert A == B == C, (len(A), len(B), len(C)) + print - print('Ordered results using pool.imap():') - for x in imap_it: - print('\t', x) - print() + L = [None] * 1000000 + print 'def noop(x): pass' + print 'L = [None] * 1000000' - print('Unordered results using pool.imap_unordered():') - for x in imap_unordered_it: - print('\t', x) - print() + t = time.time() + A = map(noop, L) + print '\tmap(noop, L):\n\t\t%s seconds' % \ + (time.time() - t) - print('Ordered results using pool.map() --- will block till complete:') - for x in pool.map(calculatestar, TASKS): - print('\t', x) - print() + t = time.time() + B = pool.map(noop, L) + print '\tpool.map(noop, L):\n\t\t%s seconds' % \ + (time.time() - t) - # - # Test error handling - # + t = time.time() + C = list(pool.imap(noop, L, chunksize=len(L)//8)) + print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \ + (len(L)//8, time.time() - t) - print('Testing error handling:') + assert A == B == C, (len(A), len(B), len(C)) + print + del A, B, C, L + + # + # Test error handling + # + + print 'Testing error handling:' + + try: + print pool.apply(f, (5,)) + except ZeroDivisionError: + print '\tGot ZeroDivisionError as expected from pool.apply()' + else: + raise AssertionError('expected ZeroDivisionError') + + try: + print pool.map(f, range(10)) + except ZeroDivisionError: + print '\tGot ZeroDivisionError as expected from pool.map()' + else: + raise AssertionError('expected ZeroDivisionError') + + try: + print list(pool.imap(f, range(10))) + except ZeroDivisionError: + print '\tGot ZeroDivisionError as expected from list(pool.imap())' + else: + raise AssertionError('expected ZeroDivisionError') + + it = pool.imap(f, range(10)) + for i in range(10): try: - print(pool.apply(f, (5,))) + x = it.next() except ZeroDivisionError: - print('\tGot ZeroDivisionError as expected from pool.apply()') + if i == 5: + pass + except StopIteration: + break else: - raise AssertionError('expected ZeroDivisionError') + if i == 5: + raise AssertionError('expected ZeroDivisionError') + + assert i == 9 + print '\tGot ZeroDivisionError as expected from IMapIterator.next()' + print + # + # Testing timeouts + # + + print 'Testing ApplyResult.get() with timeout:', + res = pool.apply_async(calculate, TASKS[0]) + while 1: + sys.stdout.flush() try: - print(pool.map(f, list(range(10)))) - except ZeroDivisionError: - print('\tGot ZeroDivisionError as expected from pool.map()') - else: - raise AssertionError('expected ZeroDivisionError') + sys.stdout.write('\n\t%s' % res.get(0.02)) + break + except multiprocessing.TimeoutError: + sys.stdout.write('.') + print + print + print 'Testing IMapIterator.next() with timeout:', + it = pool.imap(calculatestar, TASKS) + while 1: + sys.stdout.flush() try: - print(list(pool.imap(f, list(range(10))))) - except ZeroDivisionError: - print('\tGot ZeroDivisionError as expected from list(pool.imap())') - else: - raise AssertionError('expected ZeroDivisionError') - - it = pool.imap(f, list(range(10))) - for i in range(10): - try: - x = next(it) - except ZeroDivisionError: - if i == 5: - pass - except StopIteration: - break - else: - if i == 5: - raise AssertionError('expected ZeroDivisionError') - - assert i == 9 - print('\tGot ZeroDivisionError as expected from IMapIterator.next()') - print() - - # - # Testing timeouts - # - - print('Testing ApplyResult.get() with timeout:', end=' ') - res = pool.apply_async(calculate, TASKS[0]) - while 1: - sys.stdout.flush() - try: - sys.stdout.write('\n\t%s' % res.get(0.02)) - break - except multiprocessing.TimeoutError: - sys.stdout.write('.') - print() - print() - - print('Testing IMapIterator.next() with timeout:', end=' ') - it = pool.imap(calculatestar, TASKS) - while 1: - sys.stdout.flush() - try: - sys.stdout.write('\n\t%s' % it.next(0.02)) - except StopIteration: - break - except multiprocessing.TimeoutError: - sys.stdout.write('.') - print() - print() + sys.stdout.write('\n\t%s' % it.next(0.02)) + except StopIteration: + break + except multiprocessing.TimeoutError: + sys.stdout.write('.') + print + print + + # + # Testing callback + # + + print 'Testing callback:' + + A = [] + B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729] + + r = pool.apply_async(mul, (7, 8), callback=A.append) + r.wait() + + r = pool.map_async(pow3, range(10), callback=A.extend) + r.wait() + + if A == B: + print '\tcallbacks succeeded\n' + else: + print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B) + + # + # Check there are no outstanding tasks + # + + assert not pool._cache, 'cache = %r' % pool._cache + + # + # Check close() methods + # + + print 'Testing close():' + + for worker in pool._pool: + assert worker.is_alive() + + result = pool.apply_async(time.sleep, [0.5]) + pool.close() + pool.join() + + assert result.get() is None + + for worker in pool._pool: + assert not worker.is_alive() + + print '\tclose() succeeded\n' + + # + # Check terminate() method + # + + print 'Testing terminate():' + + pool = multiprocessing.Pool(2) + DELTA = 0.1 + ignore = pool.apply(pow3, [2]) + results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] + pool.terminate() + pool.join() + + for worker in pool._pool: + assert not worker.is_alive() + + print '\tterminate() succeeded\n' + + # + # Check garbage collection + # + + print 'Testing garbage collection:' + + pool = multiprocessing.Pool(2) + DELTA = 0.1 + processes = pool._pool + ignore = pool.apply(pow3, [2]) + results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] + + results = pool = None + + time.sleep(DELTA * 2) + + for worker in processes: + assert not worker.is_alive() + + print '\tgarbage collection succeeded\n' if __name__ == '__main__': multiprocessing.freeze_support() + + assert len(sys.argv) in (1, 2) + + if len(sys.argv) == 1 or sys.argv[1] == 'processes': + print ' Using processes '.center(79, '-') + elif sys.argv[1] == 'threads': + print ' Using threads '.center(79, '-') + import multiprocessing.dummy as multiprocessing + else: + print 'Usage:\n\t%s [processes | threads]' % sys.argv[0] + raise SystemExit(2) + test() |