diff options
Diffstat (limited to 'Doc/includes/mp_synchronize.py')
-rw-r--r-- | Doc/includes/mp_synchronize.py | 273 |
1 files changed, 273 insertions, 0 deletions
diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py new file mode 100644 index 0000000..8cf11bd --- /dev/null +++ b/Doc/includes/mp_synchronize.py @@ -0,0 +1,273 @@ +# +# A test file for the `multiprocessing` package +# + +import time, sys, random +from Queue import Empty + +import multiprocessing # may get overwritten + + +#### TEST_VALUE + +def value_func(running, mutex): + random.seed() + time.sleep(random.random()*4) + + mutex.acquire() + print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished' + running.value -= 1 + mutex.release() + +def test_value(): + TASKS = 10 + running = multiprocessing.Value('i', TASKS) + mutex = multiprocessing.Lock() + + for i in range(TASKS): + p = multiprocessing.Process(target=value_func, args=(running, mutex)) + p.start() + + while running.value > 0: + time.sleep(0.08) + mutex.acquire() + print running.value, + sys.stdout.flush() + mutex.release() + + print + print 'No more running processes' + + +#### TEST_QUEUE + +def queue_func(queue): + for i in range(30): + time.sleep(0.5 * random.random()) + queue.put(i*i) + queue.put('STOP') + +def test_queue(): + q = multiprocessing.Queue() + + p = multiprocessing.Process(target=queue_func, args=(q,)) + p.start() + + o = None + while o != 'STOP': + try: + o = q.get(timeout=0.3) + print o, + sys.stdout.flush() + except Empty: + print 'TIMEOUT' + + print + + +#### TEST_CONDITION + +def condition_func(cond): + cond.acquire() + print '\t' + str(cond) + time.sleep(2) + print '\tchild is notifying' + print '\t' + str(cond) + cond.notify() + cond.release() + +def test_condition(): + cond = multiprocessing.Condition() + + p = multiprocessing.Process(target=condition_func, args=(cond,)) + print cond + + cond.acquire() + print cond + cond.acquire() + print cond + + p.start() + + print 'main is waiting' + cond.wait() + print 'main has woken up' + + print cond + cond.release() + print cond + cond.release() + + p.join() + print cond + + +#### TEST_SEMAPHORE + +def semaphore_func(sema, mutex, running): + sema.acquire() + + mutex.acquire() + running.value += 1 + print running.value, 'tasks are running' + mutex.release() + + random.seed() + time.sleep(random.random()*2) + + mutex.acquire() + running.value -= 1 + print '%s has finished' % multiprocessing.current_process() + mutex.release() + + sema.release() + +def test_semaphore(): + sema = multiprocessing.Semaphore(3) + mutex = multiprocessing.RLock() + running = multiprocessing.Value('i', 0) + + processes = [ + multiprocessing.Process(target=semaphore_func, + args=(sema, mutex, running)) + for i in range(10) + ] + + for p in processes: + p.start() + + for p in processes: + p.join() + + +#### TEST_JOIN_TIMEOUT + +def join_timeout_func(): + print '\tchild sleeping' + time.sleep(5.5) + print '\n\tchild terminating' + +def test_join_timeout(): + p = multiprocessing.Process(target=join_timeout_func) + p.start() + + print 'waiting for process to finish' + + while 1: + p.join(timeout=1) + if not p.is_alive(): + break + print '.', + sys.stdout.flush() + + +#### TEST_EVENT + +def event_func(event): + print '\t%r is waiting' % multiprocessing.current_process() + event.wait() + print '\t%r has woken up' % multiprocessing.current_process() + +def test_event(): + event = multiprocessing.Event() + + processes = [multiprocessing.Process(target=event_func, args=(event,)) + for i in range(5)] + + for p in processes: + p.start() + + print 'main is sleeping' + time.sleep(2) + + print 'main is setting event' + event.set() + + for p in processes: + p.join() + + +#### TEST_SHAREDVALUES + +def sharedvalues_func(values, arrays, shared_values, shared_arrays): + for i in range(len(values)): + v = values[i][1] + sv = shared_values[i].value + assert v == sv + + for i in range(len(values)): + a = arrays[i][1] + sa = list(shared_arrays[i][:]) + assert a == sa + + print 'Tests passed' + +def test_sharedvalues(): + values = [ + ('i', 10), + ('h', -2), + ('d', 1.25) + ] + arrays = [ + ('i', range(100)), + ('d', [0.25 * i for i in range(100)]), + ('H', range(1000)) + ] + + shared_values = [multiprocessing.Value(id, v) for id, v in values] + shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] + + p = multiprocessing.Process( + target=sharedvalues_func, + args=(values, arrays, shared_values, shared_arrays) + ) + p.start() + p.join() + + assert p.get_exitcode() == 0 + + +#### + +def test(namespace=multiprocessing): + global multiprocessing + + multiprocessing = namespace + + for func in [ test_value, test_queue, test_condition, + test_semaphore, test_join_timeout, test_event, + test_sharedvalues ]: + + print '\n\t######## %s\n' % func.__name__ + func() + + ignore = multiprocessing.active_children() # cleanup any old processes + if hasattr(multiprocessing, '_debug_info'): + info = multiprocessing._debug_info() + if info: + print info + raise ValueError, 'there should be no positive refcounts left' + + +if __name__ == '__main__': + multiprocessing.freeze_support() + + assert len(sys.argv) in (1, 2) + + if len(sys.argv) == 1 or sys.argv[1] == 'processes': + print ' Using processes '.center(79, '-') + namespace = multiprocessing + elif sys.argv[1] == 'manager': + print ' Using processes and a manager '.center(79, '-') + namespace = multiprocessing.Manager() + namespace.Process = multiprocessing.Process + namespace.current_process = multiprocessing.current_process + namespace.active_children = multiprocessing.active_children + elif sys.argv[1] == 'threads': + print ' Using threads '.center(79, '-') + import multiprocessing.dummy as namespace + else: + print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0] + raise SystemExit, 2 + + test(namespace) |