diff options
Diffstat (limited to 'Doc/includes/mp_synchronize.py')
-rw-r--r-- | Doc/includes/mp_synchronize.py | 278 |
1 files changed, 0 insertions, 278 deletions
diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py deleted file mode 100644 index 81dbc38..0000000 --- a/Doc/includes/mp_synchronize.py +++ /dev/null @@ -1,278 +0,0 @@ -# -# A test file for the `multiprocessing` package -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - -import time -import sys -import random -from queue import Empty - -import multiprocessing # may get overwritten - - -#### TEST_VALUE - -def value_func(running, mutex): - random.seed() - time.sleep(random.random()*4) - - mutex.acquire() - print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished') - running.value -= 1 - mutex.release() - -def test_value(): - TASKS = 10 - running = multiprocessing.Value('i', TASKS) - mutex = multiprocessing.Lock() - - for i in range(TASKS): - p = multiprocessing.Process(target=value_func, args=(running, mutex)) - p.start() - - while running.value > 0: - time.sleep(0.08) - mutex.acquire() - print(running.value, end=' ') - sys.stdout.flush() - mutex.release() - - print() - print('No more running processes') - - -#### TEST_QUEUE - -def queue_func(queue): - for i in range(30): - time.sleep(0.5 * random.random()) - queue.put(i*i) - queue.put('STOP') - -def test_queue(): - q = multiprocessing.Queue() - - p = multiprocessing.Process(target=queue_func, args=(q,)) - p.start() - - o = None - while o != 'STOP': - try: - o = q.get(timeout=0.3) - print(o, end=' ') - sys.stdout.flush() - except Empty: - print('TIMEOUT') - - print() - - -#### TEST_CONDITION - -def condition_func(cond): - cond.acquire() - print('\t' + str(cond)) - time.sleep(2) - print('\tchild is notifying') - print('\t' + str(cond)) - cond.notify() - cond.release() - -def test_condition(): - cond = multiprocessing.Condition() - - p = multiprocessing.Process(target=condition_func, args=(cond,)) - print(cond) - - cond.acquire() - print(cond) - cond.acquire() - print(cond) - - p.start() - - print('main is waiting') - cond.wait() - print('main has woken up') - - print(cond) - cond.release() - print(cond) - cond.release() - - p.join() - print(cond) - - -#### TEST_SEMAPHORE - -def semaphore_func(sema, mutex, running): - sema.acquire() - - mutex.acquire() - running.value += 1 - print(running.value, 'tasks are running') - mutex.release() - - random.seed() - time.sleep(random.random()*2) - - mutex.acquire() - running.value -= 1 - print('%s has finished' % multiprocessing.current_process()) - mutex.release() - - sema.release() - -def test_semaphore(): - sema = multiprocessing.Semaphore(3) - mutex = multiprocessing.RLock() - running = multiprocessing.Value('i', 0) - - processes = [ - multiprocessing.Process(target=semaphore_func, - args=(sema, mutex, running)) - for i in range(10) - ] - - for p in processes: - p.start() - - for p in processes: - p.join() - - -#### TEST_JOIN_TIMEOUT - -def join_timeout_func(): - print('\tchild sleeping') - time.sleep(5.5) - print('\n\tchild terminating') - -def test_join_timeout(): - p = multiprocessing.Process(target=join_timeout_func) - p.start() - - print('waiting for process to finish') - - while 1: - p.join(timeout=1) - if not p.is_alive(): - break - print('.', end=' ') - sys.stdout.flush() - - -#### TEST_EVENT - -def event_func(event): - print('\t%r is waiting' % multiprocessing.current_process()) - event.wait() - print('\t%r has woken up' % multiprocessing.current_process()) - -def test_event(): - event = multiprocessing.Event() - - processes = [multiprocessing.Process(target=event_func, args=(event,)) - for i in range(5)] - - for p in processes: - p.start() - - print('main is sleeping') - time.sleep(2) - - print('main is setting event') - event.set() - - for p in processes: - p.join() - - -#### TEST_SHAREDVALUES - -def sharedvalues_func(values, arrays, shared_values, shared_arrays): - for i in range(len(values)): - v = values[i][1] - sv = shared_values[i].value - assert v == sv - - for i in range(len(values)): - a = arrays[i][1] - sa = list(shared_arrays[i][:]) - assert a == sa - - print('Tests passed') - -def test_sharedvalues(): - values = [ - ('i', 10), - ('h', -2), - ('d', 1.25) - ] - arrays = [ - ('i', list(range(100))), - ('d', [0.25 * i for i in range(100)]), - ('H', list(range(1000))) - ] - - shared_values = [multiprocessing.Value(id, v) for id, v in values] - shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] - - p = multiprocessing.Process( - target=sharedvalues_func, - args=(values, arrays, shared_values, shared_arrays) - ) - p.start() - p.join() - - assert p.exitcode == 0 - - -#### - -def test(namespace=multiprocessing): - global multiprocessing - - multiprocessing = namespace - - for func in [test_value, test_queue, test_condition, - test_semaphore, test_join_timeout, test_event, - test_sharedvalues]: - - print('\n\t######## %s\n' % func.__name__) - func() - - ignore = multiprocessing.active_children() # cleanup any old processes - if hasattr(multiprocessing, '_debug_info'): - info = multiprocessing._debug_info() - if info: - print(info) - raise ValueError('there should be no positive refcounts left') - - -if __name__ == '__main__': - multiprocessing.freeze_support() - - assert len(sys.argv) in (1, 2) - - if len(sys.argv) == 1 or sys.argv[1] == 'processes': - print(' Using processes '.center(79, '-')) - namespace = multiprocessing - elif sys.argv[1] == 'manager': - print(' Using processes and a manager '.center(79, '-')) - namespace = multiprocessing.Manager() - namespace.Process = multiprocessing.Process - namespace.current_process = multiprocessing.current_process - namespace.active_children = multiprocessing.active_children - elif sys.argv[1] == 'threads': - print(' Using threads '.center(79, '-')) - import multiprocessing.dummy as namespace - else: - print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]) - raise SystemExit(2) - - test(namespace) |