summaryrefslogtreecommitdiffstats
path: root/Doc
diff options
context:
space:
mode:
authorRichard Oudkerk <shibturn@gmail.com>2013-08-14 14:35:41 (GMT)
committerRichard Oudkerk <shibturn@gmail.com>2013-08-14 14:35:41 (GMT)
commit84ed9a68bd9a13252b376b21a9167dabae254325 (patch)
treeec8daa39fcf64b658bddf52f56ae47c0bdc2b091 /Doc
parentd06eeb4a2492b59d34ab69a2046dcae1f10ec593 (diff)
downloadcpython-84ed9a68bd9a13252b376b21a9167dabae254325.zip
cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.gz
cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.bz2
Issue #8713: Support alternative start methods in multiprocessing on Unix.
See http://hg.python.org/sandbox/sbt#spawn
Diffstat (limited to 'Doc')
-rw-r--r--Doc/includes/mp_benchmarks.py239
-rw-r--r--Doc/includes/mp_newtype.py14
-rw-r--r--Doc/includes/mp_pool.py337
-rw-r--r--Doc/includes/mp_synchronize.py278
-rw-r--r--Doc/includes/mp_webserver.py70
-rw-r--r--Doc/includes/mp_workers.py13
-rw-r--r--Doc/library/multiprocessing.rst240
-rw-r--r--Doc/whatsnew/3.4.rst13
8 files changed, 258 insertions, 946 deletions
diff --git a/Doc/includes/mp_benchmarks.py b/Doc/includes/mp_benchmarks.py
deleted file mode 100644
index 3763ea9..0000000
--- a/Doc/includes/mp_benchmarks.py
+++ /dev/null
@@ -1,239 +0,0 @@
-#
-# Simple benchmarks for the multiprocessing package
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import time
-import multiprocessing
-import threading
-import queue
-import gc
-
-_timer = time.perf_counter
-
-delta = 1
-
-
-#### TEST_QUEUESPEED
-
-def queuespeed_func(q, c, iterations):
- a = '0' * 256
- c.acquire()
- c.notify()
- c.release()
-
- for i in range(iterations):
- q.put(a)
-
- q.put('STOP')
-
-def test_queuespeed(Process, q, c):
- elapsed = 0
- iterations = 1
-
- while elapsed < delta:
- iterations *= 2
-
- p = Process(target=queuespeed_func, args=(q, c, iterations))
- c.acquire()
- p.start()
- c.wait()
- c.release()
-
- result = None
- t = _timer()
-
- while result != 'STOP':
- result = q.get()
-
- elapsed = _timer() - t
-
- p.join()
-
- print(iterations, 'objects passed through the queue in', elapsed, 'seconds')
- print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_PIPESPEED
-
-def pipe_func(c, cond, iterations):
- a = '0' * 256
- cond.acquire()
- cond.notify()
- cond.release()
-
- for i in range(iterations):
- c.send(a)
-
- c.send('STOP')
-
-def test_pipespeed():
- c, d = multiprocessing.Pipe()
- cond = multiprocessing.Condition()
- elapsed = 0
- iterations = 1
-
- while elapsed < delta:
- iterations *= 2
-
- p = multiprocessing.Process(target=pipe_func,
- args=(d, cond, iterations))
- cond.acquire()
- p.start()
- cond.wait()
- cond.release()
-
- result = None
- t = _timer()
-
- while result != 'STOP':
- result = c.recv()
-
- elapsed = _timer() - t
- p.join()
-
- print(iterations, 'objects passed through connection in',elapsed,'seconds')
- print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_SEQSPEED
-
-def test_seqspeed(seq):
- elapsed = 0
- iterations = 1
-
- while elapsed < delta:
- iterations *= 2
-
- t = _timer()
-
- for i in range(iterations):
- a = seq[5]
-
- elapsed = _timer() - t
-
- print(iterations, 'iterations in', elapsed, 'seconds')
- print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_LOCK
-
-def test_lockspeed(l):
- elapsed = 0
- iterations = 1
-
- while elapsed < delta:
- iterations *= 2
-
- t = _timer()
-
- for i in range(iterations):
- l.acquire()
- l.release()
-
- elapsed = _timer() - t
-
- print(iterations, 'iterations in', elapsed, 'seconds')
- print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_CONDITION
-
-def conditionspeed_func(c, N):
- c.acquire()
- c.notify()
-
- for i in range(N):
- c.wait()
- c.notify()
-
- c.release()
-
-def test_conditionspeed(Process, c):
- elapsed = 0
- iterations = 1
-
- while elapsed < delta:
- iterations *= 2
-
- c.acquire()
- p = Process(target=conditionspeed_func, args=(c, iterations))
- p.start()
-
- c.wait()
-
- t = _timer()
-
- for i in range(iterations):
- c.notify()
- c.wait()
-
- elapsed = _timer() - t
-
- c.release()
- p.join()
-
- print(iterations * 2, 'waits in', elapsed, 'seconds')
- print('average number/sec:', iterations * 2 / elapsed)
-
-####
-
-def test():
- manager = multiprocessing.Manager()
-
- gc.disable()
-
- print('\n\t######## testing Queue.Queue\n')
- test_queuespeed(threading.Thread, queue.Queue(),
- threading.Condition())
- print('\n\t######## testing multiprocessing.Queue\n')
- test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
- multiprocessing.Condition())
- print('\n\t######## testing Queue managed by server process\n')
- test_queuespeed(multiprocessing.Process, manager.Queue(),
- manager.Condition())
- print('\n\t######## testing multiprocessing.Pipe\n')
- test_pipespeed()
-
- print()
-
- print('\n\t######## testing list\n')
- test_seqspeed(list(range(10)))
- print('\n\t######## testing list managed by server process\n')
- test_seqspeed(manager.list(list(range(10))))
- print('\n\t######## testing Array("i", ..., lock=False)\n')
- test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=False))
- print('\n\t######## testing Array("i", ..., lock=True)\n')
- test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=True))
-
- print()
-
- print('\n\t######## testing threading.Lock\n')
- test_lockspeed(threading.Lock())
- print('\n\t######## testing threading.RLock\n')
- test_lockspeed(threading.RLock())
- print('\n\t######## testing multiprocessing.Lock\n')
- test_lockspeed(multiprocessing.Lock())
- print('\n\t######## testing multiprocessing.RLock\n')
- test_lockspeed(multiprocessing.RLock())
- print('\n\t######## testing lock managed by server process\n')
- test_lockspeed(manager.Lock())
- print('\n\t######## testing rlock managed by server process\n')
- test_lockspeed(manager.RLock())
-
- print()
-
- print('\n\t######## testing threading.Condition\n')
- test_conditionspeed(threading.Thread, threading.Condition())
- print('\n\t######## testing multiprocessing.Condition\n')
- test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
- print('\n\t######## testing condition managed by a server process\n')
- test_conditionspeed(multiprocessing.Process, manager.Condition())
-
- gc.enable()
-
-if __name__ == '__main__':
- multiprocessing.freeze_support()
- test()
diff --git a/Doc/includes/mp_newtype.py b/Doc/includes/mp_newtype.py
index 7291743..1c796a5 100644
--- a/Doc/includes/mp_newtype.py
+++ b/Doc/includes/mp_newtype.py
@@ -1,11 +1,3 @@
-#
-# This module shows how to use arbitrary callables with a subclass of
-# `BaseManager`.
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
@@ -27,12 +19,10 @@ def baz():
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
- _exposed_ = ('next', '__next__')
+ _exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
- return self._callmethod('next')
- def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
@@ -90,8 +80,6 @@ def test():
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
- print('op.getslice(range(10), 2, 6) =', op.getslice(list(range(10)), 2, 6))
- print('op.repeat(range(5), 3) =', op.repeat(list(range(5)), 3))
print('op._exposed_ =', op._exposed_)
##
diff --git a/Doc/includes/mp_pool.py b/Doc/includes/mp_pool.py
index 1578498..11101e1 100644
--- a/Doc/includes/mp_pool.py
+++ b/Doc/includes/mp_pool.py
@@ -1,10 +1,3 @@
-#
-# A test of `multiprocessing.Pool` class
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
import multiprocessing
import time
import random
@@ -46,269 +39,115 @@ def noop(x):
#
def test():
- print('cpu_count() = %d\n' % multiprocessing.cpu_count())
-
- #
- # Create pool
- #
-
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
- pool = multiprocessing.Pool(PROCESSES)
- print('pool = %s' % pool)
- print()
-
- #
- # Tests
- #
-
- TASKS = [(mul, (i, 7)) for i in range(10)] + \
- [(plus, (i, 8)) for i in range(10)]
-
- results = [pool.apply_async(calculate, t) for t in TASKS]
- imap_it = pool.imap(calculatestar, TASKS)
- imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
-
- print('Ordered results using pool.apply_async():')
- for r in results:
- print('\t', r.get())
- print()
-
- print('Ordered results using pool.imap():')
- for x in imap_it:
- print('\t', x)
- print()
-
- print('Unordered results using pool.imap_unordered():')
- for x in imap_unordered_it:
- print('\t', x)
- print()
-
- print('Ordered results using pool.map() --- will block till complete:')
- for x in pool.map(calculatestar, TASKS):
- print('\t', x)
- print()
-
- #
- # Simple benchmarks
- #
-
- N = 100000
- print('def pow3(x): return x**3')
- t = time.time()
- A = list(map(pow3, range(N)))
- print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % \
- (N, time.time() - t))
+ with multiprocessing.Pool(PROCESSES) as pool:
+ #
+ # Tests
+ #
- t = time.time()
- B = pool.map(pow3, range(N))
- print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % \
- (N, time.time() - t))
+ TASKS = [(mul, (i, 7)) for i in range(10)] + \
+ [(plus, (i, 8)) for i in range(10)]
- t = time.time()
- C = list(pool.imap(pow3, range(N), chunksize=N//8))
- print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' \
- ' seconds' % (N, N//8, time.time() - t))
+ results = [pool.apply_async(calculate, t) for t in TASKS]
+ imap_it = pool.imap(calculatestar, TASKS)
+ imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
- assert A == B == C, (len(A), len(B), len(C))
- print()
+ print('Ordered results using pool.apply_async():')
+ for r in results:
+ print('\t', r.get())
+ print()
- L = [None] * 1000000
- print('def noop(x): pass')
- print('L = [None] * 1000000')
+ print('Ordered results using pool.imap():')
+ for x in imap_it:
+ print('\t', x)
+ print()
- t = time.time()
- A = list(map(noop, L))
- print('\tmap(noop, L):\n\t\t%s seconds' % \
- (time.time() - t))
+ print('Unordered results using pool.imap_unordered():')
+ for x in imap_unordered_it:
+ print('\t', x)
+ print()
- t = time.time()
- B = pool.map(noop, L)
- print('\tpool.map(noop, L):\n\t\t%s seconds' % \
- (time.time() - t))
+ print('Ordered results using pool.map() --- will block till complete:')
+ for x in pool.map(calculatestar, TASKS):
+ print('\t', x)
+ print()
- t = time.time()
- C = list(pool.imap(noop, L, chunksize=len(L)//8))
- print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
- (len(L)//8, time.time() - t))
+ #
+ # Test error handling
+ #
- assert A == B == C, (len(A), len(B), len(C))
- print()
+ print('Testing error handling:')
- del A, B, C, L
-
- #
- # Test error handling
- #
-
- print('Testing error handling:')
-
- try:
- print(pool.apply(f, (5,)))
- except ZeroDivisionError:
- print('\tGot ZeroDivisionError as expected from pool.apply()')
- else:
- raise AssertionError('expected ZeroDivisionError')
-
- try:
- print(pool.map(f, list(range(10))))
- except ZeroDivisionError:
- print('\tGot ZeroDivisionError as expected from pool.map()')
- else:
- raise AssertionError('expected ZeroDivisionError')
-
- try:
- print(list(pool.imap(f, list(range(10)))))
- except ZeroDivisionError:
- print('\tGot ZeroDivisionError as expected from list(pool.imap())')
- else:
- raise AssertionError('expected ZeroDivisionError')
-
- it = pool.imap(f, list(range(10)))
- for i in range(10):
try:
- x = next(it)
+ print(pool.apply(f, (5,)))
except ZeroDivisionError:
- if i == 5:
- pass
- except StopIteration:
- break
+ print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
- if i == 5:
- raise AssertionError('expected ZeroDivisionError')
-
- assert i == 9
- print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
- print()
+ raise AssertionError('expected ZeroDivisionError')
- #
- # Testing timeouts
- #
-
- print('Testing ApplyResult.get() with timeout:', end=' ')
- res = pool.apply_async(calculate, TASKS[0])
- while 1:
- sys.stdout.flush()
try:
- sys.stdout.write('\n\t%s' % res.get(0.02))
- break
- except multiprocessing.TimeoutError:
- sys.stdout.write('.')
- print()
- print()
+ print(pool.map(f, list(range(10))))
+ except ZeroDivisionError:
+ print('\tGot ZeroDivisionError as expected from pool.map()')
+ else:
+ raise AssertionError('expected ZeroDivisionError')
- print('Testing IMapIterator.next() with timeout:', end=' ')
- it = pool.imap(calculatestar, TASKS)
- while 1:
- sys.stdout.flush()
try:
- sys.stdout.write('\n\t%s' % it.next(0.02))
- except StopIteration:
- break
- except multiprocessing.TimeoutError:
- sys.stdout.write('.')
- print()
- print()
-
- #
- # Testing callback
- #
-
- print('Testing callback:')
-
- A = []
- B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
-
- r = pool.apply_async(mul, (7, 8), callback=A.append)
- r.wait()
-
- r = pool.map_async(pow3, list(range(10)), callback=A.extend)
- r.wait()
-
- if A == B:
- print('\tcallbacks succeeded\n')
- else:
- print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
-
- #
- # Check there are no outstanding tasks
- #
-
- assert not pool._cache, 'cache = %r' % pool._cache
-
- #
- # Check close() methods
- #
-
- print('Testing close():')
-
- for worker in pool._pool:
- assert worker.is_alive()
-
- result = pool.apply_async(time.sleep, [0.5])
- pool.close()
- pool.join()
-
- assert result.get() is None
-
- for worker in pool._pool:
- assert not worker.is_alive()
-
- print('\tclose() succeeded\n')
-
- #
- # Check terminate() method
- #
-
- print('Testing terminate():')
-
- pool = multiprocessing.Pool(2)
- DELTA = 0.1
- ignore = pool.apply(pow3, [2])
- results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
- pool.terminate()
- pool.join()
-
- for worker in pool._pool:
- assert not worker.is_alive()
-
- print('\tterminate() succeeded\n')
-
- #
- # Check garbage collection
- #
-
- print('Testing garbage collection:')
-
- pool = multiprocessing.Pool(2)
- DELTA = 0.1
- processes = pool._pool
- ignore = pool.apply(pow3, [2])
- results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
-
- results = pool = None
-
- time.sleep(DELTA * 2)
-
- for worker in processes:
- assert not worker.is_alive()
-
- print('\tgarbage collection succeeded\n')
+ print(list(pool.imap(f, list(range(10)))))
+ except ZeroDivisionError:
+ print('\tGot ZeroDivisionError as expected from list(pool.imap())')
+ else:
+ raise AssertionError('expected ZeroDivisionError')
+
+ it = pool.imap(f, list(range(10)))
+ for i in range(10):
+ try:
+ x = next(it)
+ except ZeroDivisionError:
+ if i == 5:
+ pass
+ except StopIteration:
+ break
+ else:
+ if i == 5:
+ raise AssertionError('expected ZeroDivisionError')
+
+ assert i == 9
+ print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
+ print()
+
+ #
+ # Testing timeouts
+ #
+
+ print('Testing ApplyResult.get() with timeout:', end=' ')
+ res = pool.apply_async(calculate, TASKS[0])
+ while 1:
+ sys.stdout.flush()
+ try:
+ sys.stdout.write('\n\t%s' % res.get(0.02))
+ break
+ except multiprocessing.TimeoutError:
+ sys.stdout.write('.')
+ print()
+ print()
+
+ print('Testing IMapIterator.next() with timeout:', end=' ')
+ it = pool.imap(calculatestar, TASKS)
+ while 1:
+ sys.stdout.flush()
+ try:
+ sys.stdout.write('\n\t%s' % it.next(0.02))
+ except StopIteration:
+ break
+ except multiprocessing.TimeoutError:
+ sys.stdout.write('.')
+ print()
+ print()
if __name__ == '__main__':
multiprocessing.freeze_support()
-
- assert len(sys.argv) in (1, 2)
-
- if len(sys.argv) == 1 or sys.argv[1] == 'processes':
- print(' Using processes '.center(79, '-'))
- elif sys.argv[1] == 'threads':
- print(' Using threads '.center(79, '-'))
- import multiprocessing.dummy as multiprocessing
- else:
- print('Usage:\n\t%s [processes | threads]' % sys.argv[0])
- raise SystemExit(2)
-
test()
diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py
deleted file mode 100644
index 81dbc38..0000000
--- a/Doc/includes/mp_synchronize.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#
-# A test file for the `multiprocessing` package
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import time
-import sys
-import random
-from queue import Empty
-
-import multiprocessing # may get overwritten
-
-
-#### TEST_VALUE
-
-def value_func(running, mutex):
- random.seed()
- time.sleep(random.random()*4)
-
- mutex.acquire()
- print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished')
- running.value -= 1
- mutex.release()
-
-def test_value():
- TASKS = 10
- running = multiprocessing.Value('i', TASKS)
- mutex = multiprocessing.Lock()
-
- for i in range(TASKS):
- p = multiprocessing.Process(target=value_func, args=(running, mutex))
- p.start()
-
- while running.value > 0:
- time.sleep(0.08)
- mutex.acquire()
- print(running.value, end=' ')
- sys.stdout.flush()
- mutex.release()
-
- print()
- print('No more running processes')
-
-
-#### TEST_QUEUE
-
-def queue_func(queue):
- for i in range(30):
- time.sleep(0.5 * random.random())
- queue.put(i*i)
- queue.put('STOP')
-
-def test_queue():
- q = multiprocessing.Queue()
-
- p = multiprocessing.Process(target=queue_func, args=(q,))
- p.start()
-
- o = None
- while o != 'STOP':
- try:
- o = q.get(timeout=0.3)
- print(o, end=' ')
- sys.stdout.flush()
- except Empty:
- print('TIMEOUT')
-
- print()
-
-
-#### TEST_CONDITION
-
-def condition_func(cond):
- cond.acquire()
- print('\t' + str(cond))
- time.sleep(2)
- print('\tchild is notifying')
- print('\t' + str(cond))
- cond.notify()
- cond.release()
-
-def test_condition():
- cond = multiprocessing.Condition()
-
- p = multiprocessing.Process(target=condition_func, args=(cond,))
- print(cond)
-
- cond.acquire()
- print(cond)
- cond.acquire()
- print(cond)
-
- p.start()
-
- print('main is waiting')
- cond.wait()
- print('main has woken up')
-
- print(cond)
- cond.release()
- print(cond)
- cond.release()
-
- p.join()
- print(cond)
-
-
-#### TEST_SEMAPHORE
-
-def semaphore_func(sema, mutex, running):
- sema.acquire()
-
- mutex.acquire()
- running.value += 1
- print(running.value, 'tasks are running')
- mutex.release()
-
- random.seed()
- time.sleep(random.random()*2)
-
- mutex.acquire()
- running.value -= 1
- print('%s has finished' % multiprocessing.current_process())
- mutex.release()
-
- sema.release()
-
-def test_semaphore():
- sema = multiprocessing.Semaphore(3)
- mutex = multiprocessing.RLock()
- running = multiprocessing.Value('i', 0)
-
- processes = [
- multiprocessing.Process(target=semaphore_func,
- args=(sema, mutex, running))
- for i in range(10)
- ]
-
- for p in processes:
- p.start()
-
- for p in processes:
- p.join()
-
-
-#### TEST_JOIN_TIMEOUT
-
-def join_timeout_func():
- print('\tchild sleeping')
- time.sleep(5.5)
- print('\n\tchild terminating')
-
-def test_join_timeout():
- p = multiprocessing.Process(target=join_timeout_func)
- p.start()
-
- print('waiting for process to finish')
-
- while 1:
- p.join(timeout=1)
- if not p.is_alive():
- break
- print('.', end=' ')
- sys.stdout.flush()
-
-
-#### TEST_EVENT
-
-def event_func(event):
- print('\t%r is waiting' % multiprocessing.current_process())
- event.wait()
- print('\t%r has woken up' % multiprocessing.current_process())
-
-def test_event():
- event = multiprocessing.Event()
-
- processes = [multiprocessing.Process(target=event_func, args=(event,))
- for i in range(5)]
-
- for p in processes:
- p.start()
-
- print('main is sleeping')
- time.sleep(2)
-
- print('main is setting event')
- event.set()
-
- for p in processes:
- p.join()
-
-
-#### TEST_SHAREDVALUES
-
-def sharedvalues_func(values, arrays, shared_values, shared_arrays):
- for i in range(len(values)):
- v = values[i][1]
- sv = shared_values[i].value
- assert v == sv
-
- for i in range(len(values)):
- a = arrays[i][1]
- sa = list(shared_arrays[i][:])
- assert a == sa
-
- print('Tests passed')
-
-def test_sharedvalues():
- values = [
- ('i', 10),
- ('h', -2),
- ('d', 1.25)
- ]
- arrays = [
- ('i', list(range(100))),
- ('d', [0.25 * i for i in range(100)]),
- ('H', list(range(1000)))
- ]
-
- shared_values = [multiprocessing.Value(id, v) for id, v in values]
- shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
-
- p = multiprocessing.Process(
- target=sharedvalues_func,
- args=(values, arrays, shared_values, shared_arrays)
- )
- p.start()
- p.join()
-
- assert p.exitcode == 0
-
-
-####
-
-def test(namespace=multiprocessing):
- global multiprocessing
-
- multiprocessing = namespace
-
- for func in [test_value, test_queue, test_condition,
- test_semaphore, test_join_timeout, test_event,
- test_sharedvalues]:
-
- print('\n\t######## %s\n' % func.__name__)
- func()
-
- ignore = multiprocessing.active_children() # cleanup any old processes
- if hasattr(multiprocessing, '_debug_info'):
- info = multiprocessing._debug_info()
- if info:
- print(info)
- raise ValueError('there should be no positive refcounts left')
-
-
-if __name__ == '__main__':
- multiprocessing.freeze_support()
-
- assert len(sys.argv) in (1, 2)
-
- if len(sys.argv) == 1 or sys.argv[1] == 'processes':
- print(' Using processes '.center(79, '-'))
- namespace = multiprocessing
- elif sys.argv[1] == 'manager':
- print(' Using processes and a manager '.center(79, '-'))
- namespace = multiprocessing.Manager()
- namespace.Process = multiprocessing.Process
- namespace.current_process = multiprocessing.current_process
- namespace.active_children = multiprocessing.active_children
- elif sys.argv[1] == 'threads':
- print(' Using threads '.center(79, '-'))
- import multiprocessing.dummy as namespace
- else:
- print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0])
- raise SystemExit(2)
-
- test(namespace)
diff --git a/Doc/includes/mp_webserver.py b/Doc/includes/mp_webserver.py
deleted file mode 100644
index 651024d..0000000
--- a/Doc/includes/mp_webserver.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Example where a pool of http servers share a single listening socket
-#
-# On Windows this module depends on the ability to pickle a socket
-# object so that the worker processes can inherit a copy of the server
-# object. (We import `multiprocessing.reduction` to enable this pickling.)
-#
-# Not sure if we should synchronize access to `socket.accept()` method by
-# using a process-shared lock -- does not seem to be necessary.
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import os
-import sys
-
-from multiprocessing import Process, current_process, freeze_support
-from http.server import HTTPServer
-from http.server import SimpleHTTPRequestHandler
-
-if sys.platform == 'win32':
- import multiprocessing.reduction # make sockets pickable/inheritable
-
-
-def note(format, *args):
- sys.stderr.write('[%s]\t%s\n' % (current_process().name, format % args))
-
-
-class RequestHandler(SimpleHTTPRequestHandler):
- # we override log_message() to show which process is handling the request
- def log_message(self, format, *args):
- note(format, *args)
-
-def serve_forever(server):
- note('starting server')
- try:
- server.serve_forever()
- except KeyboardInterrupt:
- pass
-
-
-def runpool(address, number_of_processes):
- # create a single server object -- children will each inherit a copy
- server = HTTPServer(address, RequestHandler)
-
- # create child processes to act as workers
- for i in range(number_of_processes - 1):
- Process(target=serve_forever, args=(server,)).start()
-
- # main process also acts as a worker
- serve_forever(server)
-
-
-def test():
- DIR = os.path.join(os.path.dirname(__file__), '..')
- ADDRESS = ('localhost', 8000)
- NUMBER_OF_PROCESSES = 4
-
- print('Serving at http://%s:%d using %d worker processes' % \
- (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES))
- print('To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32'])
-
- os.chdir(DIR)
- runpool(ADDRESS, NUMBER_OF_PROCESSES)
-
-
-if __name__ == '__main__':
- freeze_support()
- test()
diff --git a/Doc/includes/mp_workers.py b/Doc/includes/mp_workers.py
index e66d97b..3b92269 100644
--- a/Doc/includes/mp_workers.py
+++ b/Doc/includes/mp_workers.py
@@ -1,16 +1,3 @@
-#
-# Simple example which uses a pool of workers to carry out some tasks.
-#
-# Notice that the results will probably not come out of the output
-# queue in the same in the same order as the corresponding tasks were
-# put on the input queue. If it is important to get the results back
-# in the original order then consider using `Pool.map()` or
-# `Pool.imap()` (which will save on the amount of code needed anyway).
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
import time
import random
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
index f659eac..f58c308 100644
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -93,11 +93,80 @@ To show the individual process IDs involved, here is an expanded example::
p.start()
p.join()
-For an explanation of why (on Windows) the ``if __name__ == '__main__'`` part is
+For an explanation of why the ``if __name__ == '__main__'`` part is
necessary, see :ref:`multiprocessing-programming`.
+Start methods
+~~~~~~~~~~~~~
+
+Depending on the platform, :mod:`multiprocessing` supports three ways
+to start a process. These *start methods* are
+
+ *spawn*
+ The parent process starts a fresh python interpreter process. The
+ child process will only inherit those resources necessary to run
+ the process objects :meth:`~Process.run` method. In particular,
+ unnecessary file descriptors and handles from the parent process
+ will not be inherited. Starting a process using this method is
+ rather slow compared to using *fork* or *forkserver*.
+
+ Available on Unix and Windows. The default on Windows.
+
+ *fork*
+ The parent process uses :func:`os.fork` to fork the Python
+ interpreter. The child process, when it begins, is effectively
+ identical to the parent process. All resources of the parent are
+ inherited by the child process. Note that safely forking a
+ multithreaded process is problematic.
+
+ Available on Unix only. The default on Unix.
+
+ *forkserver*
+ When the program starts and selects the *forkserver* start method,
+ a server process is started. From then on, whenever a new process
+ is need the parent process connects to the server and requests
+ that it fork a new process. The fork server process is single
+ threaded so it is safe for it to use :func:`os.fork`. No
+ unnecessary resources are inherited.
+
+ Available on Unix platforms which support passing file descriptors
+ over unix pipes.
+
+Before Python 3.4 *fork* was the only option available on Unix. Also,
+prior to Python 3.4, child processes would inherit all the parents
+inheritable handles on Windows.
+
+On Unix using the *spawn* or *forkserver* start methods will also
+start a *semaphore tracker* process which tracks the unlinked named
+semaphores created by processes of the program. When all processes
+have exited the semaphore tracker unlinks any remaining semaphores.
+Usually there should be none, but if a process was killed by a signal
+there may some "leaked" semaphores. (Unlinking the named semaphores
+is a serious matter since the system allows only a limited number, and
+they will not be automatically unlinked until the next reboot.)
+
+To select the a start method you use the :func:`set_start_method` in
+the ``if __name__ == '__main__'`` clause of the main module. For
+example::
+
+ import multiprocessing as mp
+
+ def foo():
+ print('hello')
+
+ if __name__ == '__main__':
+ mp.set_start_method('spawn')
+ p = mp.Process(target=foo)
+ p.start()
+ p.join()
+
+:func:`set_start_method` should not be used more than once in the
+program.
+
+
+
Exchanging objects between processes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -274,15 +343,31 @@ processes in a few different ways.
For example::
from multiprocessing import Pool
+ from time import sleep
def f(x):
return x*x
if __name__ == '__main__':
- with Pool(processes=4) as pool: # start 4 worker processes
- result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
- print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
- print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
+ # start 4 worker processes
+ with Pool(processes=4) as pool:
+
+ # print "[0, 1, 4,..., 81]"
+ print(pool.map(f, range(10)))
+
+ # print same numbers in arbitrary order
+ for i in pool.imap_unordered(f, range(10)):
+ print(i)
+
+ # evaluate "f(10)" asynchronously
+ res = pool.apply_async(f, [10])
+ print(res.get(timeout=1)) # prints "100"
+
+ # make worker sleep for 10 secs
+ res = pool.apply_async(sleep, 10)
+ print(res.get(timeout=1)) # raises multiprocessing.TimeoutError
+
+ # exiting the 'with'-block has stopped the pool
Note that the methods of a pool should only ever be used by the
process which created it.
@@ -763,6 +848,24 @@ Miscellaneous
If the module is being run normally by the Python interpreter then
:func:`freeze_support` has no effect.
+.. function:: get_all_start_methods()
+
+ Returns a list of the supported start methods, the first of which
+ is the default. The possible start methods are ``'fork'``,
+ ``'spawn'`` and ``'forkserver'``. On Windows only ``'spawn'`` is
+ available. On Unix ``'fork'`` and ``'spawn'`` are always
+ supported, with ``'fork'`` being the default.
+
+ .. versionadded:: 3.4
+
+.. function:: get_start_method()
+
+ Return the current start method. This can be ``'fork'``,
+ ``'spawn'`` or ``'forkserver'``. ``'fork'`` is the default on
+ Unix, while ``'spawn'`` is the default on Windows.
+
+ .. versionadded:: 3.4
+
.. function:: set_executable()
Sets the path of the Python interpreter to use when starting a child process.
@@ -771,8 +874,21 @@ Miscellaneous
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
- before they can create child processes. (Windows only)
+ before they can create child processes.
+
+ .. versionchanged:: 3.4
+ Now supported on Unix when the ``'spawn'`` start method is used.
+
+.. function:: set_start_method(method)
+
+ Set the method which should be used to start child processes.
+ *method* can be ``'fork'``, ``'spawn'`` or ``'forkserver'``.
+
+ Note that this should be called at most once, and it should be
+ protected inside the ``if __name__ == '__main__'`` clause of the
+ main module.
+ .. versionadded:: 3.4
.. note::
@@ -2175,43 +2291,8 @@ Below is an example session with logging turned on::
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
-In addition to having these two logging functions, the multiprocessing also
-exposes two additional logging level attributes. These are :const:`SUBWARNING`
-and :const:`SUBDEBUG`. The table below illustrates where theses fit in the
-normal level hierarchy.
-
-+----------------+----------------+
-| Level | Numeric value |
-+================+================+
-| ``SUBWARNING`` | 25 |
-+----------------+----------------+
-| ``SUBDEBUG`` | 5 |
-+----------------+----------------+
-
For a full table of logging levels, see the :mod:`logging` module.
-These additional logging levels are used primarily for certain debug messages
-within the multiprocessing module. Below is the same example as above, except
-with :const:`SUBDEBUG` enabled::
-
- >>> import multiprocessing, logging
- >>> logger = multiprocessing.log_to_stderr()
- >>> logger.setLevel(multiprocessing.SUBDEBUG)
- >>> logger.warning('doomed')
- [WARNING/MainProcess] doomed
- >>> m = multiprocessing.Manager()
- [INFO/SyncManager-...] child process calling self.run()
- [INFO/SyncManager-...] created temp directory /.../pymp-...
- [INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
- >>> del m
- [SUBDEBUG/MainProcess] finalizer calling ...
- [INFO/MainProcess] sending shutdown message to manager
- [DEBUG/SyncManager-...] manager received shutdown message
- [SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
- [SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
- [SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
- [SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
- [INFO/SyncManager-...] manager exiting with exitcode 0
The :mod:`multiprocessing.dummy` module
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -2232,8 +2313,10 @@ There are certain guidelines and idioms which should be adhered to when using
:mod:`multiprocessing`.
-All platforms
-~~~~~~~~~~~~~
+All start methods
+~~~~~~~~~~~~~~~~~
+
+The following applies to all start methods.
Avoid shared state
@@ -2266,11 +2349,13 @@ Joining zombie processes
Better to inherit than pickle/unpickle
- On Windows many types from :mod:`multiprocessing` need to be picklable so
- that child processes can use them. However, one should generally avoid
- sending shared objects to other processes using pipes or queues. Instead
- you should arrange the program so that a process which needs access to a
- shared resource created elsewhere can inherit it from an ancestor process.
+ When using the *spawn* or *forkserver* start methods many types
+ from :mod:`multiprocessing` need to be picklable so that child
+ processes can use them. However, one should generally avoid
+ sending shared objects to other processes using pipes or queues.
+ Instead you should arrange the program so that a process which
+ needs access to a shared resource created elsewhere can inherit it
+ from an ancestor process.
Avoid terminating processes
@@ -2314,15 +2399,17 @@ Joining processes that use queues
Explicitly pass resources to child processes
- On Unix a child process can make use of a shared resource created in a
- parent process using a global resource. However, it is better to pass the
- object as an argument to the constructor for the child process.
+ On Unix using the *fork* start method, a child process can make
+ use of a shared resource created in a parent process using a
+ global resource. However, it is better to pass the object as an
+ argument to the constructor for the child process.
- Apart from making the code (potentially) compatible with Windows this also
- ensures that as long as the child process is still alive the object will not
- be garbage collected in the parent process. This might be important if some
- resource is freed when the object is garbage collected in the parent
- process.
+ Apart from making the code (potentially) compatible with Windows
+ and the other start methods this also ensures that as long as the
+ child process is still alive the object will not be garbage
+ collected in the parent process. This might be important if some
+ resource is freed when the object is garbage collected in the
+ parent process.
So for instance ::
@@ -2381,17 +2468,19 @@ Beware of replacing :data:`sys.stdin` with a "file like object"
For more information, see :issue:`5155`, :issue:`5313` and :issue:`5331`
-Windows
-~~~~~~~
+The *spawn* and *forkserver* start methods
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-Since Windows lacks :func:`os.fork` it has a few extra restrictions:
+There are a few extra restriction which don't apply to the *fork*
+start method.
More picklability
- Ensure that all arguments to :meth:`Process.__init__` are picklable. This
- means, in particular, that bound or unbound methods cannot be used directly
- as the ``target`` argument on Windows --- just define a function and use
- that instead.
+ Ensure that all arguments to :meth:`Process.__init__` are
+ picklable. This means, in particular, that bound or unbound
+ methods cannot be used directly as the ``target`` (unless you use
+ the *fork* start method) --- just define a function and use that
+ instead.
Also, if you subclass :class:`Process` then make sure that instances will be
picklable when the :meth:`Process.start` method is called.
@@ -2411,7 +2500,8 @@ Safe importing of main module
interpreter without causing unintended side effects (such a starting a new
process).
- For example, under Windows running the following module would fail with a
+ For example, using the *spawn* or *forkserver* start method
+ running the following module would fail with a
:exc:`RuntimeError`::
from multiprocessing import Process
@@ -2425,13 +2515,14 @@ Safe importing of main module
Instead one should protect the "entry point" of the program by using ``if
__name__ == '__main__':`` as follows::
- from multiprocessing import Process, freeze_support
+ from multiprocessing import Process, freeze_support, set_start_method
def foo():
print('hello')
if __name__ == '__main__':
freeze_support()
+ set_start_method('spawn')
p = Process(target=foo)
p.start()
@@ -2462,26 +2553,7 @@ Using :class:`Pool`:
:language: python3
-Synchronization types like locks, conditions and queues:
-
-.. literalinclude:: ../includes/mp_synchronize.py
- :language: python3
-
-
An example showing how to use queues to feed tasks to a collection of worker
processes and collect the results:
.. literalinclude:: ../includes/mp_workers.py
-
-
-An example of how a pool of worker processes can each run a
-:class:`~http.server.SimpleHTTPRequestHandler` instance while sharing a single
-listening socket.
-
-.. literalinclude:: ../includes/mp_webserver.py
-
-
-Some simple benchmarks comparing :mod:`multiprocessing` with :mod:`threading`:
-
-.. literalinclude:: ../includes/mp_benchmarks.py
-
diff --git a/Doc/whatsnew/3.4.rst b/Doc/whatsnew/3.4.rst
index 937da00..1c30d43 100644
--- a/Doc/whatsnew/3.4.rst
+++ b/Doc/whatsnew/3.4.rst
@@ -108,6 +108,8 @@ Significantly Improved Library Modules:
* Single-dispatch generic functions (:pep:`443`)
* SHA-3 (Keccak) support for :mod:`hashlib`.
* TLSv1.1 and TLSv1.2 support for :mod:`ssl`.
+* :mod:`multiprocessing` now has option to avoid using :func:`os.fork`
+ on Unix (:issue:`8713`).
Security improvements:
@@ -254,6 +256,17 @@ mmap objects can now be weakref'ed.
(Contributed by Valerie Lambert in :issue:`4885`.)
+multiprocessing
+---------------
+
+On Unix two new *start methods* have been added for starting processes
+using :mod:`multiprocessing`. These make the mixing of processes with
+threads more robust. See :issue:`8713`.
+
+Also, except when using the old *fork* start method, child processes
+will no longer inherit unneeded handles/file descriptors from their parents.
+
+
poplib
------