summaryrefslogtreecommitdiffstats
path: root/Doc/includes/mp_synchronize.py
diff options
context:
space:
mode:
Diffstat (limited to 'Doc/includes/mp_synchronize.py')
-rw-r--r--Doc/includes/mp_synchronize.py273
1 files changed, 273 insertions, 0 deletions
diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py
new file mode 100644
index 0000000..8cf11bd
--- /dev/null
+++ b/Doc/includes/mp_synchronize.py
@@ -0,0 +1,273 @@
+#
+# A test file for the `multiprocessing` package
+#
+
+import time, sys, random
+from Queue import Empty
+
+import multiprocessing # may get overwritten
+
+
+#### TEST_VALUE
+
+def value_func(running, mutex):
+ random.seed()
+ time.sleep(random.random()*4)
+
+ mutex.acquire()
+ print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
+ running.value -= 1
+ mutex.release()
+
+def test_value():
+ TASKS = 10
+ running = multiprocessing.Value('i', TASKS)
+ mutex = multiprocessing.Lock()
+
+ for i in range(TASKS):
+ p = multiprocessing.Process(target=value_func, args=(running, mutex))
+ p.start()
+
+ while running.value > 0:
+ time.sleep(0.08)
+ mutex.acquire()
+ print running.value,
+ sys.stdout.flush()
+ mutex.release()
+
+ print
+ print 'No more running processes'
+
+
+#### TEST_QUEUE
+
+def queue_func(queue):
+ for i in range(30):
+ time.sleep(0.5 * random.random())
+ queue.put(i*i)
+ queue.put('STOP')
+
+def test_queue():
+ q = multiprocessing.Queue()
+
+ p = multiprocessing.Process(target=queue_func, args=(q,))
+ p.start()
+
+ o = None
+ while o != 'STOP':
+ try:
+ o = q.get(timeout=0.3)
+ print o,
+ sys.stdout.flush()
+ except Empty:
+ print 'TIMEOUT'
+
+ print
+
+
+#### TEST_CONDITION
+
+def condition_func(cond):
+ cond.acquire()
+ print '\t' + str(cond)
+ time.sleep(2)
+ print '\tchild is notifying'
+ print '\t' + str(cond)
+ cond.notify()
+ cond.release()
+
+def test_condition():
+ cond = multiprocessing.Condition()
+
+ p = multiprocessing.Process(target=condition_func, args=(cond,))
+ print cond
+
+ cond.acquire()
+ print cond
+ cond.acquire()
+ print cond
+
+ p.start()
+
+ print 'main is waiting'
+ cond.wait()
+ print 'main has woken up'
+
+ print cond
+ cond.release()
+ print cond
+ cond.release()
+
+ p.join()
+ print cond
+
+
+#### TEST_SEMAPHORE
+
+def semaphore_func(sema, mutex, running):
+ sema.acquire()
+
+ mutex.acquire()
+ running.value += 1
+ print running.value, 'tasks are running'
+ mutex.release()
+
+ random.seed()
+ time.sleep(random.random()*2)
+
+ mutex.acquire()
+ running.value -= 1
+ print '%s has finished' % multiprocessing.current_process()
+ mutex.release()
+
+ sema.release()
+
+def test_semaphore():
+ sema = multiprocessing.Semaphore(3)
+ mutex = multiprocessing.RLock()
+ running = multiprocessing.Value('i', 0)
+
+ processes = [
+ multiprocessing.Process(target=semaphore_func,
+ args=(sema, mutex, running))
+ for i in range(10)
+ ]
+
+ for p in processes:
+ p.start()
+
+ for p in processes:
+ p.join()
+
+
+#### TEST_JOIN_TIMEOUT
+
+def join_timeout_func():
+ print '\tchild sleeping'
+ time.sleep(5.5)
+ print '\n\tchild terminating'
+
+def test_join_timeout():
+ p = multiprocessing.Process(target=join_timeout_func)
+ p.start()
+
+ print 'waiting for process to finish'
+
+ while 1:
+ p.join(timeout=1)
+ if not p.is_alive():
+ break
+ print '.',
+ sys.stdout.flush()
+
+
+#### TEST_EVENT
+
+def event_func(event):
+ print '\t%r is waiting' % multiprocessing.current_process()
+ event.wait()
+ print '\t%r has woken up' % multiprocessing.current_process()
+
+def test_event():
+ event = multiprocessing.Event()
+
+ processes = [multiprocessing.Process(target=event_func, args=(event,))
+ for i in range(5)]
+
+ for p in processes:
+ p.start()
+
+ print 'main is sleeping'
+ time.sleep(2)
+
+ print 'main is setting event'
+ event.set()
+
+ for p in processes:
+ p.join()
+
+
+#### TEST_SHAREDVALUES
+
+def sharedvalues_func(values, arrays, shared_values, shared_arrays):
+ for i in range(len(values)):
+ v = values[i][1]
+ sv = shared_values[i].value
+ assert v == sv
+
+ for i in range(len(values)):
+ a = arrays[i][1]
+ sa = list(shared_arrays[i][:])
+ assert a == sa
+
+ print 'Tests passed'
+
+def test_sharedvalues():
+ values = [
+ ('i', 10),
+ ('h', -2),
+ ('d', 1.25)
+ ]
+ arrays = [
+ ('i', range(100)),
+ ('d', [0.25 * i for i in range(100)]),
+ ('H', range(1000))
+ ]
+
+ shared_values = [multiprocessing.Value(id, v) for id, v in values]
+ shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
+
+ p = multiprocessing.Process(
+ target=sharedvalues_func,
+ args=(values, arrays, shared_values, shared_arrays)
+ )
+ p.start()
+ p.join()
+
+ assert p.get_exitcode() == 0
+
+
+####
+
+def test(namespace=multiprocessing):
+ global multiprocessing
+
+ multiprocessing = namespace
+
+ for func in [ test_value, test_queue, test_condition,
+ test_semaphore, test_join_timeout, test_event,
+ test_sharedvalues ]:
+
+ print '\n\t######## %s\n' % func.__name__
+ func()
+
+ ignore = multiprocessing.active_children() # cleanup any old processes
+ if hasattr(multiprocessing, '_debug_info'):
+ info = multiprocessing._debug_info()
+ if info:
+ print info
+ raise ValueError, 'there should be no positive refcounts left'
+
+
+if __name__ == '__main__':
+ multiprocessing.freeze_support()
+
+ assert len(sys.argv) in (1, 2)
+
+ if len(sys.argv) == 1 or sys.argv[1] == 'processes':
+ print ' Using processes '.center(79, '-')
+ namespace = multiprocessing
+ elif sys.argv[1] == 'manager':
+ print ' Using processes and a manager '.center(79, '-')
+ namespace = multiprocessing.Manager()
+ namespace.Process = multiprocessing.Process
+ namespace.current_process = multiprocessing.current_process
+ namespace.active_children = multiprocessing.active_children
+ elif sys.argv[1] == 'threads':
+ print ' Using threads '.center(79, '-')
+ import multiprocessing.dummy as namespace
+ else:
+ print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
+ raise SystemExit, 2
+
+ test(namespace)