summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/__init__.py13
-rw-r--r--Lib/multiprocessing/connection.py23
-rw-r--r--Lib/multiprocessing/dummy/__init__.py27
-rw-r--r--Lib/multiprocessing/dummy/connection.py27
-rw-r--r--Lib/multiprocessing/forking.py35
-rw-r--r--Lib/multiprocessing/managers.py2
-rw-r--r--Lib/multiprocessing/pool.py45
-rw-r--r--Lib/multiprocessing/queues.py65
8 files changed, 99 insertions, 138 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index 1f3e67c..b5f16d7 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -8,10 +8,6 @@
# subpackage 'multiprocessing.dummy' has the same API but is a simple
# wrapper for 'threading'.
#
-# Try calling `multiprocessing.doc.main()` to read the html
-# documentation in a webbrowser.
-#
-#
# Copyright (c) 2006-2008, R Oudkerk
# Licensed to PSF under a Contributor Agreement.
#
@@ -27,8 +23,6 @@ __all__ = [
'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
]
-__author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)'
-
#
# Imports
#
@@ -40,6 +34,13 @@ from multiprocessing.process import Process, current_process, active_children
from multiprocessing.util import SUBDEBUG, SUBWARNING
#
+# Alias for main module -- will be reset by bootstrapping child processes
+#
+
+if '__main__' in sys.modules:
+ sys.modules['__mp_main__'] = sys.modules['__main__']
+
+#
# Exceptions
#
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index df57906..47e2123 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -12,7 +12,6 @@ __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
import io
import os
import sys
-import pickle
import select
import socket
import struct
@@ -132,22 +131,22 @@ class _ConnectionBase:
def _check_closed(self):
if self._handle is None:
- raise IOError("handle is closed")
+ raise OSError("handle is closed")
def _check_readable(self):
if not self._readable:
- raise IOError("connection is write-only")
+ raise OSError("connection is write-only")
def _check_writable(self):
if not self._writable:
- raise IOError("connection is read-only")
+ raise OSError("connection is read-only")
def _bad_message_length(self):
if self._writable:
self._readable = False
else:
self.close()
- raise IOError("bad message length")
+ raise OSError("bad message length")
@property
def closed(self):
@@ -202,9 +201,7 @@ class _ConnectionBase:
"""Send a (picklable) object"""
self._check_closed()
self._check_writable()
- buf = io.BytesIO()
- ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
- self._send_bytes(buf.getbuffer())
+ self._send_bytes(ForkingPickler.dumps(obj))
def recv_bytes(self, maxlength=None):
"""
@@ -249,7 +246,7 @@ class _ConnectionBase:
self._check_closed()
self._check_readable()
buf = self._recv_bytes()
- return pickle.loads(buf.getbuffer())
+ return ForkingPickler.loads(buf.getbuffer())
def poll(self, timeout=0.0):
"""Whether there is any input available to be read"""
@@ -317,7 +314,7 @@ if _winapi:
return f
elif err == _winapi.ERROR_MORE_DATA:
return self._get_more_data(ov, maxsize)
- except IOError as e:
+ except OSError as e:
if e.winerror == _winapi.ERROR_BROKEN_PIPE:
raise EOFError
else:
@@ -383,7 +380,7 @@ class Connection(_ConnectionBase):
if remaining == size:
raise EOFError
else:
- raise IOError("got end of file during message")
+ raise OSError("got end of file during message")
buf.write(chunk)
remaining -= n
return buf
@@ -443,7 +440,7 @@ class Listener(object):
Returns a `Connection` object.
'''
if self._listener is None:
- raise IOError('listener is closed')
+ raise OSError('listener is closed')
c = self._listener.accept()
if self._authkey:
deliver_challenge(c, self._authkey)
@@ -676,7 +673,7 @@ if sys.platform == 'win32':
0, _winapi.NULL, _winapi.OPEN_EXISTING,
_winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
)
- except WindowsError as e:
+ except OSError as e:
if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
_winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
raise
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
index e31fc61..20ae957 100644
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -4,32 +4,7 @@
# multiprocessing/dummy/__init__.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
__all__ = [
diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py
index 874ec8e..694ef96 100644
--- a/Lib/multiprocessing/dummy/connection.py
+++ b/Lib/multiprocessing/dummy/connection.py
@@ -4,32 +4,7 @@
# multiprocessing/dummy/connection.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
__all__ = [ 'Client', 'Listener', 'Pipe' ]
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index 0bb21c4..37c9a10 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -7,7 +7,9 @@
# Licensed to PSF under a Contributor Agreement.
#
+import io
import os
+import pickle
import sys
import signal
import errno
@@ -44,6 +46,15 @@ class ForkingPickler(Pickler):
def register(cls, type, reduce):
cls._extra_reducers[type] = reduce
+ @staticmethod
+ def dumps(obj):
+ buf = io.BytesIO()
+ ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
+ return buf.getbuffer()
+
+ loads = pickle.loads
+
+
def _reduce_method(m):
if m.__self__ is None:
return getattr, (m.__class__, m.__func__.__name__)
@@ -113,7 +124,7 @@ if sys.platform != 'win32':
while True:
try:
pid, sts = os.waitpid(self.pid, flag)
- except os.error as e:
+ except OSError as e:
if e.errno == errno.EINTR:
continue
# Child process not yet created. See #1731717
@@ -448,27 +459,17 @@ def prepare(data):
dirs = [os.path.dirname(main_path)]
assert main_name not in sys.modules, main_name
+ sys.modules.pop('__mp_main__', None)
file, path_name, etc = imp.find_module(main_name, dirs)
try:
- # We would like to do "imp.load_module('__main__', ...)"
- # here. However, that would cause 'if __name__ ==
- # "__main__"' clauses to be executed.
+ # We should not do 'imp.load_module("__main__", ...)'
+ # since that would execute 'if __name__ == "__main__"'
+ # clauses, potentially causing a psuedo fork bomb.
main_module = imp.load_module(
- '__parents_main__', file, path_name, etc
+ '__mp_main__', file, path_name, etc
)
finally:
if file:
file.close()
- sys.modules['__main__'] = main_module
- main_module.__name__ = '__main__'
-
- # Try to make the potentially picklable objects in
- # sys.modules['__main__'] realize they are in the main
- # module -- somewhat ugly.
- for obj in list(main_module.__dict__.values()):
- try:
- if obj.__module__ == '__parents_main__':
- obj.__module__ = '__main__'
- except Exception:
- pass
+ sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 1ab147e..30ef771 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -167,7 +167,7 @@ class Server(object):
while True:
try:
c = self.listener.accept()
- except (OSError, IOError):
+ except OSError:
continue
t = threading.Thread(target=self.handle_request, args=(c,))
t.daemon = True
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index fc9d904..bcf8a37 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -18,6 +18,7 @@ import queue
import itertools
import collections
import time
+import traceback
from multiprocessing import Process, cpu_count, TimeoutError
from multiprocessing.util import Finalize, debug
@@ -43,6 +44,29 @@ def starmapstar(args):
return list(itertools.starmap(args[0], args[1]))
#
+# Hack to embed stringification of remote traceback in local traceback
+#
+
+class RemoteTraceback(Exception):
+ def __init__(self, tb):
+ self.tb = tb
+ def __str__(self):
+ return self.tb
+
+class ExceptionWithTraceback:
+ def __init__(self, exc, tb):
+ tb = traceback.format_exception(type(exc), exc, tb)
+ tb = ''.join(tb)
+ self.exc = exc
+ self.tb = '\n"""\n%s"""' % tb
+ def __reduce__(self):
+ return rebuild_exc, (self.exc, self.tb)
+
+def rebuild_exc(exc, tb):
+ exc.__cause__ = RemoteTraceback(tb)
+ return exc
+
+#
# Code run by worker processes
#
@@ -78,8 +102,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get()
- except (EOFError, IOError):
- debug('worker got EOFError or IOError -- exiting')
+ except (EOFError, OSError):
+ debug('worker got EOFError or OSError -- exiting')
break
if task is None:
@@ -90,6 +114,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
try:
result = (True, func(*args, **kwds))
except Exception as e:
+ e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
try:
put((job, i, result))
@@ -349,7 +374,7 @@ class Pool(object):
break
try:
put(task)
- except IOError:
+ except OSError:
debug('could not put task on queue')
break
else:
@@ -371,8 +396,8 @@ class Pool(object):
debug('task handler sending sentinel to workers')
for p in pool:
put(None)
- except IOError:
- debug('task handler got IOError when sending sentinels')
+ except OSError:
+ debug('task handler got OSError when sending sentinels')
debug('task handler exiting')
@@ -383,8 +408,8 @@ class Pool(object):
while 1:
try:
task = get()
- except (IOError, EOFError):
- debug('result handler got EOFError/IOError -- exiting')
+ except (OSError, EOFError):
+ debug('result handler got EOFError/OSError -- exiting')
return
if thread._state:
@@ -405,8 +430,8 @@ class Pool(object):
while cache and thread._state != TERMINATE:
try:
task = get()
- except (IOError, EOFError):
- debug('result handler got EOFError/IOError -- exiting')
+ except (OSError, EOFError):
+ debug('result handler got EOFError/OSError -- exiting')
return
if task is None:
@@ -428,7 +453,7 @@ class Pool(object):
if not outqueue._reader.poll():
break
get()
- except (IOError, EOFError):
+ except (OSError, EOFError):
pass
debug('result handler exiting: len(cache)=%s, thread._state=%s',
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 37271fb..ec188ee 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -22,7 +22,7 @@ import _multiprocessing
from multiprocessing.connection import Pipe
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning
+from multiprocessing.forking import assert_spawning, ForkingPickler
#
# Queue type using a pipe, buffer and thread
@@ -69,8 +69,8 @@ class Queue(object):
self._joincancelled = False
self._closed = False
self._close = None
- self._send = self._writer.send
- self._recv = self._reader.recv
+ self._send_bytes = self._writer.send_bytes
+ self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll
def put(self, obj, block=True, timeout=None):
@@ -89,14 +89,9 @@ class Queue(object):
def get(self, block=True, timeout=None):
if block and timeout is None:
- self._rlock.acquire()
- try:
- res = self._recv()
- self._sem.release()
- return res
- finally:
- self._rlock.release()
-
+ with self._rlock:
+ res = self._recv_bytes()
+ self._sem.release()
else:
if block:
deadline = time.time() + timeout
@@ -109,11 +104,12 @@ class Queue(object):
raise Empty
elif not self._poll():
raise Empty
- res = self._recv()
+ res = self._recv_bytes()
self._sem.release()
- return res
finally:
self._rlock.release()
+ # unserialize the data after having released the lock
+ return ForkingPickler.loads(res)
def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -158,7 +154,7 @@ class Queue(object):
self._buffer.clear()
self._thread = threading.Thread(
target=Queue._feed,
- args=(self._buffer, self._notempty, self._send,
+ args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread'
)
@@ -210,7 +206,7 @@ class Queue(object):
notempty.release()
@staticmethod
- def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
+ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
from .util import is_exiting
@@ -241,12 +237,14 @@ class Queue(object):
close()
return
+ # serialize the data before acquiring the lock
+ obj = ForkingPickler.dumps(obj)
if wacquire is None:
- send(obj)
+ send_bytes(obj)
else:
wacquire()
try:
- send(obj)
+ send_bytes(obj)
finally:
wrelease()
except IndexError:
@@ -340,7 +338,6 @@ class SimpleQueue(object):
self._wlock = None
else:
self._wlock = Lock()
- self._make_methods()
def empty(self):
return not self._poll()
@@ -351,29 +348,19 @@ class SimpleQueue(object):
def __setstate__(self, state):
(self._reader, self._writer, self._rlock, self._wlock) = state
- self._make_methods()
- def _make_methods(self):
- recv = self._reader.recv
- racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get():
- racquire()
- try:
- return recv()
- finally:
- rrelease()
- self.get = get
+ def get(self):
+ with self._rlock:
+ res = self._reader.recv_bytes()
+ # unserialize the data after having released the lock
+ return ForkingPickler.loads(res)
+ def put(self, obj):
+ # serialize the data before acquiring the lock
+ obj = ForkingPickler.dumps(obj)
if self._wlock is None:
# writes to a message oriented win32 pipe are atomic
- self.put = self._writer.send
+ self._writer.send_bytes(obj)
else:
- send = self._writer.send
- wacquire, wrelease = self._wlock.acquire, self._wlock.release
- def put(obj):
- wacquire()
- try:
- return send(obj)
- finally:
- wrelease()
- self.put = put
+ with self._wlock:
+ self._writer.send_bytes(obj)