diff options
author | Richard Oudkerk <shibturn@gmail.com> | 2013-08-14 14:35:41 (GMT) |
---|---|---|
committer | Richard Oudkerk <shibturn@gmail.com> | 2013-08-14 14:35:41 (GMT) |
commit | 84ed9a68bd9a13252b376b21a9167dabae254325 (patch) | |
tree | ec8daa39fcf64b658bddf52f56ae47c0bdc2b091 /Doc/includes | |
parent | d06eeb4a2492b59d34ab69a2046dcae1f10ec593 (diff) | |
download | cpython-84ed9a68bd9a13252b376b21a9167dabae254325.zip cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.gz cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.bz2 |
Issue #8713: Support alternative start methods in multiprocessing on Unix.
See http://hg.python.org/sandbox/sbt#spawn
Diffstat (limited to 'Doc/includes')
-rw-r--r-- | Doc/includes/mp_benchmarks.py | 239 | ||||
-rw-r--r-- | Doc/includes/mp_newtype.py | 14 | ||||
-rw-r--r-- | Doc/includes/mp_pool.py | 337 | ||||
-rw-r--r-- | Doc/includes/mp_synchronize.py | 278 | ||||
-rw-r--r-- | Doc/includes/mp_webserver.py | 70 | ||||
-rw-r--r-- | Doc/includes/mp_workers.py | 13 |
6 files changed, 89 insertions, 862 deletions
diff --git a/Doc/includes/mp_benchmarks.py b/Doc/includes/mp_benchmarks.py deleted file mode 100644 index 3763ea9..0000000 --- a/Doc/includes/mp_benchmarks.py +++ /dev/null @@ -1,239 +0,0 @@ -# -# Simple benchmarks for the multiprocessing package -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - -import time -import multiprocessing -import threading -import queue -import gc - -_timer = time.perf_counter - -delta = 1 - - -#### TEST_QUEUESPEED - -def queuespeed_func(q, c, iterations): - a = '0' * 256 - c.acquire() - c.notify() - c.release() - - for i in range(iterations): - q.put(a) - - q.put('STOP') - -def test_queuespeed(Process, q, c): - elapsed = 0 - iterations = 1 - - while elapsed < delta: - iterations *= 2 - - p = Process(target=queuespeed_func, args=(q, c, iterations)) - c.acquire() - p.start() - c.wait() - c.release() - - result = None - t = _timer() - - while result != 'STOP': - result = q.get() - - elapsed = _timer() - t - - p.join() - - print(iterations, 'objects passed through the queue in', elapsed, 'seconds') - print('average number/sec:', iterations/elapsed) - - -#### TEST_PIPESPEED - -def pipe_func(c, cond, iterations): - a = '0' * 256 - cond.acquire() - cond.notify() - cond.release() - - for i in range(iterations): - c.send(a) - - c.send('STOP') - -def test_pipespeed(): - c, d = multiprocessing.Pipe() - cond = multiprocessing.Condition() - elapsed = 0 - iterations = 1 - - while elapsed < delta: - iterations *= 2 - - p = multiprocessing.Process(target=pipe_func, - args=(d, cond, iterations)) - cond.acquire() - p.start() - cond.wait() - cond.release() - - result = None - t = _timer() - - while result != 'STOP': - result = c.recv() - - elapsed = _timer() - t - p.join() - - print(iterations, 'objects passed through connection in',elapsed,'seconds') - print('average number/sec:', iterations/elapsed) - - -#### TEST_SEQSPEED - -def test_seqspeed(seq): - elapsed = 0 - iterations = 1 - - while elapsed < delta: - iterations *= 2 - - t = _timer() - - for i in range(iterations): - a = seq[5] - - elapsed = _timer() - t - - print(iterations, 'iterations in', elapsed, 'seconds') - print('average number/sec:', iterations/elapsed) - - -#### TEST_LOCK - -def test_lockspeed(l): - elapsed = 0 - iterations = 1 - - while elapsed < delta: - iterations *= 2 - - t = _timer() - - for i in range(iterations): - l.acquire() - l.release() - - elapsed = _timer() - t - - print(iterations, 'iterations in', elapsed, 'seconds') - print('average number/sec:', iterations/elapsed) - - -#### TEST_CONDITION - -def conditionspeed_func(c, N): - c.acquire() - c.notify() - - for i in range(N): - c.wait() - c.notify() - - c.release() - -def test_conditionspeed(Process, c): - elapsed = 0 - iterations = 1 - - while elapsed < delta: - iterations *= 2 - - c.acquire() - p = Process(target=conditionspeed_func, args=(c, iterations)) - p.start() - - c.wait() - - t = _timer() - - for i in range(iterations): - c.notify() - c.wait() - - elapsed = _timer() - t - - c.release() - p.join() - - print(iterations * 2, 'waits in', elapsed, 'seconds') - print('average number/sec:', iterations * 2 / elapsed) - -#### - -def test(): - manager = multiprocessing.Manager() - - gc.disable() - - print('\n\t######## testing Queue.Queue\n') - test_queuespeed(threading.Thread, queue.Queue(), - threading.Condition()) - print('\n\t######## testing multiprocessing.Queue\n') - test_queuespeed(multiprocessing.Process, multiprocessing.Queue(), - multiprocessing.Condition()) - print('\n\t######## testing Queue managed by server process\n') - test_queuespeed(multiprocessing.Process, manager.Queue(), - manager.Condition()) - print('\n\t######## testing multiprocessing.Pipe\n') - test_pipespeed() - - print() - - print('\n\t######## testing list\n') - test_seqspeed(list(range(10))) - print('\n\t######## testing list managed by server process\n') - test_seqspeed(manager.list(list(range(10)))) - print('\n\t######## testing Array("i", ..., lock=False)\n') - test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=False)) - print('\n\t######## testing Array("i", ..., lock=True)\n') - test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=True)) - - print() - - print('\n\t######## testing threading.Lock\n') - test_lockspeed(threading.Lock()) - print('\n\t######## testing threading.RLock\n') - test_lockspeed(threading.RLock()) - print('\n\t######## testing multiprocessing.Lock\n') - test_lockspeed(multiprocessing.Lock()) - print('\n\t######## testing multiprocessing.RLock\n') - test_lockspeed(multiprocessing.RLock()) - print('\n\t######## testing lock managed by server process\n') - test_lockspeed(manager.Lock()) - print('\n\t######## testing rlock managed by server process\n') - test_lockspeed(manager.RLock()) - - print() - - print('\n\t######## testing threading.Condition\n') - test_conditionspeed(threading.Thread, threading.Condition()) - print('\n\t######## testing multiprocessing.Condition\n') - test_conditionspeed(multiprocessing.Process, multiprocessing.Condition()) - print('\n\t######## testing condition managed by a server process\n') - test_conditionspeed(multiprocessing.Process, manager.Condition()) - - gc.enable() - -if __name__ == '__main__': - multiprocessing.freeze_support() - test() diff --git a/Doc/includes/mp_newtype.py b/Doc/includes/mp_newtype.py index 7291743..1c796a5 100644 --- a/Doc/includes/mp_newtype.py +++ b/Doc/includes/mp_newtype.py @@ -1,11 +1,3 @@ -# -# This module shows how to use arbitrary callables with a subclass of -# `BaseManager`. -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - from multiprocessing import freeze_support from multiprocessing.managers import BaseManager, BaseProxy import operator @@ -27,12 +19,10 @@ def baz(): # Proxy type for generator objects class GeneratorProxy(BaseProxy): - _exposed_ = ('next', '__next__') + _exposed_ = ['__next__'] def __iter__(self): return self def __next__(self): - return self._callmethod('next') - def __next__(self): return self._callmethod('__next__') # Function to return the operator module @@ -90,8 +80,6 @@ def test(): op = manager.operator() print('op.add(23, 45) =', op.add(23, 45)) print('op.pow(2, 94) =', op.pow(2, 94)) - print('op.getslice(range(10), 2, 6) =', op.getslice(list(range(10)), 2, 6)) - print('op.repeat(range(5), 3) =', op.repeat(list(range(5)), 3)) print('op._exposed_ =', op._exposed_) ## diff --git a/Doc/includes/mp_pool.py b/Doc/includes/mp_pool.py index 1578498..11101e1 100644 --- a/Doc/includes/mp_pool.py +++ b/Doc/includes/mp_pool.py @@ -1,10 +1,3 @@ -# -# A test of `multiprocessing.Pool` class -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - import multiprocessing import time import random @@ -46,269 +39,115 @@ def noop(x): # def test(): - print('cpu_count() = %d\n' % multiprocessing.cpu_count()) - - # - # Create pool - # - PROCESSES = 4 print('Creating pool with %d processes\n' % PROCESSES) - pool = multiprocessing.Pool(PROCESSES) - print('pool = %s' % pool) - print() - - # - # Tests - # - - TASKS = [(mul, (i, 7)) for i in range(10)] + \ - [(plus, (i, 8)) for i in range(10)] - - results = [pool.apply_async(calculate, t) for t in TASKS] - imap_it = pool.imap(calculatestar, TASKS) - imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) - - print('Ordered results using pool.apply_async():') - for r in results: - print('\t', r.get()) - print() - - print('Ordered results using pool.imap():') - for x in imap_it: - print('\t', x) - print() - - print('Unordered results using pool.imap_unordered():') - for x in imap_unordered_it: - print('\t', x) - print() - - print('Ordered results using pool.map() --- will block till complete:') - for x in pool.map(calculatestar, TASKS): - print('\t', x) - print() - - # - # Simple benchmarks - # - - N = 100000 - print('def pow3(x): return x**3') - t = time.time() - A = list(map(pow3, range(N))) - print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % \ - (N, time.time() - t)) + with multiprocessing.Pool(PROCESSES) as pool: + # + # Tests + # - t = time.time() - B = pool.map(pow3, range(N)) - print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % \ - (N, time.time() - t)) + TASKS = [(mul, (i, 7)) for i in range(10)] + \ + [(plus, (i, 8)) for i in range(10)] - t = time.time() - C = list(pool.imap(pow3, range(N), chunksize=N//8)) - print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' \ - ' seconds' % (N, N//8, time.time() - t)) + results = [pool.apply_async(calculate, t) for t in TASKS] + imap_it = pool.imap(calculatestar, TASKS) + imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) - assert A == B == C, (len(A), len(B), len(C)) - print() + print('Ordered results using pool.apply_async():') + for r in results: + print('\t', r.get()) + print() - L = [None] * 1000000 - print('def noop(x): pass') - print('L = [None] * 1000000') + print('Ordered results using pool.imap():') + for x in imap_it: + print('\t', x) + print() - t = time.time() - A = list(map(noop, L)) - print('\tmap(noop, L):\n\t\t%s seconds' % \ - (time.time() - t)) + print('Unordered results using pool.imap_unordered():') + for x in imap_unordered_it: + print('\t', x) + print() - t = time.time() - B = pool.map(noop, L) - print('\tpool.map(noop, L):\n\t\t%s seconds' % \ - (time.time() - t)) + print('Ordered results using pool.map() --- will block till complete:') + for x in pool.map(calculatestar, TASKS): + print('\t', x) + print() - t = time.time() - C = list(pool.imap(noop, L, chunksize=len(L)//8)) - print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \ - (len(L)//8, time.time() - t)) + # + # Test error handling + # - assert A == B == C, (len(A), len(B), len(C)) - print() + print('Testing error handling:') - del A, B, C, L - - # - # Test error handling - # - - print('Testing error handling:') - - try: - print(pool.apply(f, (5,))) - except ZeroDivisionError: - print('\tGot ZeroDivisionError as expected from pool.apply()') - else: - raise AssertionError('expected ZeroDivisionError') - - try: - print(pool.map(f, list(range(10)))) - except ZeroDivisionError: - print('\tGot ZeroDivisionError as expected from pool.map()') - else: - raise AssertionError('expected ZeroDivisionError') - - try: - print(list(pool.imap(f, list(range(10))))) - except ZeroDivisionError: - print('\tGot ZeroDivisionError as expected from list(pool.imap())') - else: - raise AssertionError('expected ZeroDivisionError') - - it = pool.imap(f, list(range(10))) - for i in range(10): try: - x = next(it) + print(pool.apply(f, (5,))) except ZeroDivisionError: - if i == 5: - pass - except StopIteration: - break + print('\tGot ZeroDivisionError as expected from pool.apply()') else: - if i == 5: - raise AssertionError('expected ZeroDivisionError') - - assert i == 9 - print('\tGot ZeroDivisionError as expected from IMapIterator.next()') - print() + raise AssertionError('expected ZeroDivisionError') - # - # Testing timeouts - # - - print('Testing ApplyResult.get() with timeout:', end=' ') - res = pool.apply_async(calculate, TASKS[0]) - while 1: - sys.stdout.flush() try: - sys.stdout.write('\n\t%s' % res.get(0.02)) - break - except multiprocessing.TimeoutError: - sys.stdout.write('.') - print() - print() + print(pool.map(f, list(range(10)))) + except ZeroDivisionError: + print('\tGot ZeroDivisionError as expected from pool.map()') + else: + raise AssertionError('expected ZeroDivisionError') - print('Testing IMapIterator.next() with timeout:', end=' ') - it = pool.imap(calculatestar, TASKS) - while 1: - sys.stdout.flush() try: - sys.stdout.write('\n\t%s' % it.next(0.02)) - except StopIteration: - break - except multiprocessing.TimeoutError: - sys.stdout.write('.') - print() - print() - - # - # Testing callback - # - - print('Testing callback:') - - A = [] - B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729] - - r = pool.apply_async(mul, (7, 8), callback=A.append) - r.wait() - - r = pool.map_async(pow3, list(range(10)), callback=A.extend) - r.wait() - - if A == B: - print('\tcallbacks succeeded\n') - else: - print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)) - - # - # Check there are no outstanding tasks - # - - assert not pool._cache, 'cache = %r' % pool._cache - - # - # Check close() methods - # - - print('Testing close():') - - for worker in pool._pool: - assert worker.is_alive() - - result = pool.apply_async(time.sleep, [0.5]) - pool.close() - pool.join() - - assert result.get() is None - - for worker in pool._pool: - assert not worker.is_alive() - - print('\tclose() succeeded\n') - - # - # Check terminate() method - # - - print('Testing terminate():') - - pool = multiprocessing.Pool(2) - DELTA = 0.1 - ignore = pool.apply(pow3, [2]) - results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] - pool.terminate() - pool.join() - - for worker in pool._pool: - assert not worker.is_alive() - - print('\tterminate() succeeded\n') - - # - # Check garbage collection - # - - print('Testing garbage collection:') - - pool = multiprocessing.Pool(2) - DELTA = 0.1 - processes = pool._pool - ignore = pool.apply(pow3, [2]) - results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] - - results = pool = None - - time.sleep(DELTA * 2) - - for worker in processes: - assert not worker.is_alive() - - print('\tgarbage collection succeeded\n') + print(list(pool.imap(f, list(range(10))))) + except ZeroDivisionError: + print('\tGot ZeroDivisionError as expected from list(pool.imap())') + else: + raise AssertionError('expected ZeroDivisionError') + + it = pool.imap(f, list(range(10))) + for i in range(10): + try: + x = next(it) + except ZeroDivisionError: + if i == 5: + pass + except StopIteration: + break + else: + if i == 5: + raise AssertionError('expected ZeroDivisionError') + + assert i == 9 + print('\tGot ZeroDivisionError as expected from IMapIterator.next()') + print() + + # + # Testing timeouts + # + + print('Testing ApplyResult.get() with timeout:', end=' ') + res = pool.apply_async(calculate, TASKS[0]) + while 1: + sys.stdout.flush() + try: + sys.stdout.write('\n\t%s' % res.get(0.02)) + break + except multiprocessing.TimeoutError: + sys.stdout.write('.') + print() + print() + + print('Testing IMapIterator.next() with timeout:', end=' ') + it = pool.imap(calculatestar, TASKS) + while 1: + sys.stdout.flush() + try: + sys.stdout.write('\n\t%s' % it.next(0.02)) + except StopIteration: + break + except multiprocessing.TimeoutError: + sys.stdout.write('.') + print() + print() if __name__ == '__main__': multiprocessing.freeze_support() - - assert len(sys.argv) in (1, 2) - - if len(sys.argv) == 1 or sys.argv[1] == 'processes': - print(' Using processes '.center(79, '-')) - elif sys.argv[1] == 'threads': - print(' Using threads '.center(79, '-')) - import multiprocessing.dummy as multiprocessing - else: - print('Usage:\n\t%s [processes | threads]' % sys.argv[0]) - raise SystemExit(2) - test() diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py deleted file mode 100644 index 81dbc38..0000000 --- a/Doc/includes/mp_synchronize.py +++ /dev/null @@ -1,278 +0,0 @@ -# -# A test file for the `multiprocessing` package -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - -import time -import sys -import random -from queue import Empty - -import multiprocessing # may get overwritten - - -#### TEST_VALUE - -def value_func(running, mutex): - random.seed() - time.sleep(random.random()*4) - - mutex.acquire() - print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished') - running.value -= 1 - mutex.release() - -def test_value(): - TASKS = 10 - running = multiprocessing.Value('i', TASKS) - mutex = multiprocessing.Lock() - - for i in range(TASKS): - p = multiprocessing.Process(target=value_func, args=(running, mutex)) - p.start() - - while running.value > 0: - time.sleep(0.08) - mutex.acquire() - print(running.value, end=' ') - sys.stdout.flush() - mutex.release() - - print() - print('No more running processes') - - -#### TEST_QUEUE - -def queue_func(queue): - for i in range(30): - time.sleep(0.5 * random.random()) - queue.put(i*i) - queue.put('STOP') - -def test_queue(): - q = multiprocessing.Queue() - - p = multiprocessing.Process(target=queue_func, args=(q,)) - p.start() - - o = None - while o != 'STOP': - try: - o = q.get(timeout=0.3) - print(o, end=' ') - sys.stdout.flush() - except Empty: - print('TIMEOUT') - - print() - - -#### TEST_CONDITION - -def condition_func(cond): - cond.acquire() - print('\t' + str(cond)) - time.sleep(2) - print('\tchild is notifying') - print('\t' + str(cond)) - cond.notify() - cond.release() - -def test_condition(): - cond = multiprocessing.Condition() - - p = multiprocessing.Process(target=condition_func, args=(cond,)) - print(cond) - - cond.acquire() - print(cond) - cond.acquire() - print(cond) - - p.start() - - print('main is waiting') - cond.wait() - print('main has woken up') - - print(cond) - cond.release() - print(cond) - cond.release() - - p.join() - print(cond) - - -#### TEST_SEMAPHORE - -def semaphore_func(sema, mutex, running): - sema.acquire() - - mutex.acquire() - running.value += 1 - print(running.value, 'tasks are running') - mutex.release() - - random.seed() - time.sleep(random.random()*2) - - mutex.acquire() - running.value -= 1 - print('%s has finished' % multiprocessing.current_process()) - mutex.release() - - sema.release() - -def test_semaphore(): - sema = multiprocessing.Semaphore(3) - mutex = multiprocessing.RLock() - running = multiprocessing.Value('i', 0) - - processes = [ - multiprocessing.Process(target=semaphore_func, - args=(sema, mutex, running)) - for i in range(10) - ] - - for p in processes: - p.start() - - for p in processes: - p.join() - - -#### TEST_JOIN_TIMEOUT - -def join_timeout_func(): - print('\tchild sleeping') - time.sleep(5.5) - print('\n\tchild terminating') - -def test_join_timeout(): - p = multiprocessing.Process(target=join_timeout_func) - p.start() - - print('waiting for process to finish') - - while 1: - p.join(timeout=1) - if not p.is_alive(): - break - print('.', end=' ') - sys.stdout.flush() - - -#### TEST_EVENT - -def event_func(event): - print('\t%r is waiting' % multiprocessing.current_process()) - event.wait() - print('\t%r has woken up' % multiprocessing.current_process()) - -def test_event(): - event = multiprocessing.Event() - - processes = [multiprocessing.Process(target=event_func, args=(event,)) - for i in range(5)] - - for p in processes: - p.start() - - print('main is sleeping') - time.sleep(2) - - print('main is setting event') - event.set() - - for p in processes: - p.join() - - -#### TEST_SHAREDVALUES - -def sharedvalues_func(values, arrays, shared_values, shared_arrays): - for i in range(len(values)): - v = values[i][1] - sv = shared_values[i].value - assert v == sv - - for i in range(len(values)): - a = arrays[i][1] - sa = list(shared_arrays[i][:]) - assert a == sa - - print('Tests passed') - -def test_sharedvalues(): - values = [ - ('i', 10), - ('h', -2), - ('d', 1.25) - ] - arrays = [ - ('i', list(range(100))), - ('d', [0.25 * i for i in range(100)]), - ('H', list(range(1000))) - ] - - shared_values = [multiprocessing.Value(id, v) for id, v in values] - shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] - - p = multiprocessing.Process( - target=sharedvalues_func, - args=(values, arrays, shared_values, shared_arrays) - ) - p.start() - p.join() - - assert p.exitcode == 0 - - -#### - -def test(namespace=multiprocessing): - global multiprocessing - - multiprocessing = namespace - - for func in [test_value, test_queue, test_condition, - test_semaphore, test_join_timeout, test_event, - test_sharedvalues]: - - print('\n\t######## %s\n' % func.__name__) - func() - - ignore = multiprocessing.active_children() # cleanup any old processes - if hasattr(multiprocessing, '_debug_info'): - info = multiprocessing._debug_info() - if info: - print(info) - raise ValueError('there should be no positive refcounts left') - - -if __name__ == '__main__': - multiprocessing.freeze_support() - - assert len(sys.argv) in (1, 2) - - if len(sys.argv) == 1 or sys.argv[1] == 'processes': - print(' Using processes '.center(79, '-')) - namespace = multiprocessing - elif sys.argv[1] == 'manager': - print(' Using processes and a manager '.center(79, '-')) - namespace = multiprocessing.Manager() - namespace.Process = multiprocessing.Process - namespace.current_process = multiprocessing.current_process - namespace.active_children = multiprocessing.active_children - elif sys.argv[1] == 'threads': - print(' Using threads '.center(79, '-')) - import multiprocessing.dummy as namespace - else: - print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]) - raise SystemExit(2) - - test(namespace) diff --git a/Doc/includes/mp_webserver.py b/Doc/includes/mp_webserver.py deleted file mode 100644 index 651024d..0000000 --- a/Doc/includes/mp_webserver.py +++ /dev/null @@ -1,70 +0,0 @@ -# -# Example where a pool of http servers share a single listening socket -# -# On Windows this module depends on the ability to pickle a socket -# object so that the worker processes can inherit a copy of the server -# object. (We import `multiprocessing.reduction` to enable this pickling.) -# -# Not sure if we should synchronize access to `socket.accept()` method by -# using a process-shared lock -- does not seem to be necessary. -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - -import os -import sys - -from multiprocessing import Process, current_process, freeze_support -from http.server import HTTPServer -from http.server import SimpleHTTPRequestHandler - -if sys.platform == 'win32': - import multiprocessing.reduction # make sockets pickable/inheritable - - -def note(format, *args): - sys.stderr.write('[%s]\t%s\n' % (current_process().name, format % args)) - - -class RequestHandler(SimpleHTTPRequestHandler): - # we override log_message() to show which process is handling the request - def log_message(self, format, *args): - note(format, *args) - -def serve_forever(server): - note('starting server') - try: - server.serve_forever() - except KeyboardInterrupt: - pass - - -def runpool(address, number_of_processes): - # create a single server object -- children will each inherit a copy - server = HTTPServer(address, RequestHandler) - - # create child processes to act as workers - for i in range(number_of_processes - 1): - Process(target=serve_forever, args=(server,)).start() - - # main process also acts as a worker - serve_forever(server) - - -def test(): - DIR = os.path.join(os.path.dirname(__file__), '..') - ADDRESS = ('localhost', 8000) - NUMBER_OF_PROCESSES = 4 - - print('Serving at http://%s:%d using %d worker processes' % \ - (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)) - print('To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']) - - os.chdir(DIR) - runpool(ADDRESS, NUMBER_OF_PROCESSES) - - -if __name__ == '__main__': - freeze_support() - test() diff --git a/Doc/includes/mp_workers.py b/Doc/includes/mp_workers.py index e66d97b..3b92269 100644 --- a/Doc/includes/mp_workers.py +++ b/Doc/includes/mp_workers.py @@ -1,16 +1,3 @@ -# -# Simple example which uses a pool of workers to carry out some tasks. -# -# Notice that the results will probably not come out of the output -# queue in the same in the same order as the corresponding tasks were -# put on the input queue. If it is important to get the results back -# in the original order then consider using `Pool.map()` or -# `Pool.imap()` (which will save on the amount of code needed anyway). -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - import time import random |