diff options
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/__init__.py | 13 | ||||
-rw-r--r-- | Lib/multiprocessing/connection.py | 23 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/__init__.py | 27 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/connection.py | 27 | ||||
-rw-r--r-- | Lib/multiprocessing/forking.py | 35 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 45 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 65 |
8 files changed, 99 insertions, 138 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index 1f3e67c..b5f16d7 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -8,10 +8,6 @@ # subpackage 'multiprocessing.dummy' has the same API but is a simple # wrapper for 'threading'. # -# Try calling `multiprocessing.doc.main()` to read the html -# documentation in a webbrowser. -# -# # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # @@ -27,8 +23,6 @@ __all__ = [ 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING', ] -__author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)' - # # Imports # @@ -40,6 +34,13 @@ from multiprocessing.process import Process, current_process, active_children from multiprocessing.util import SUBDEBUG, SUBWARNING # +# Alias for main module -- will be reset by bootstrapping child processes +# + +if '__main__' in sys.modules: + sys.modules['__mp_main__'] = sys.modules['__main__'] + +# # Exceptions # diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index df57906..47e2123 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -12,7 +12,6 @@ __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] import io import os import sys -import pickle import select import socket import struct @@ -132,22 +131,22 @@ class _ConnectionBase: def _check_closed(self): if self._handle is None: - raise IOError("handle is closed") + raise OSError("handle is closed") def _check_readable(self): if not self._readable: - raise IOError("connection is write-only") + raise OSError("connection is write-only") def _check_writable(self): if not self._writable: - raise IOError("connection is read-only") + raise OSError("connection is read-only") def _bad_message_length(self): if self._writable: self._readable = False else: self.close() - raise IOError("bad message length") + raise OSError("bad message length") @property def closed(self): @@ -202,9 +201,7 @@ class _ConnectionBase: """Send a (picklable) object""" self._check_closed() self._check_writable() - buf = io.BytesIO() - ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) - self._send_bytes(buf.getbuffer()) + self._send_bytes(ForkingPickler.dumps(obj)) def recv_bytes(self, maxlength=None): """ @@ -249,7 +246,7 @@ class _ConnectionBase: self._check_closed() self._check_readable() buf = self._recv_bytes() - return pickle.loads(buf.getbuffer()) + return ForkingPickler.loads(buf.getbuffer()) def poll(self, timeout=0.0): """Whether there is any input available to be read""" @@ -317,7 +314,7 @@ if _winapi: return f elif err == _winapi.ERROR_MORE_DATA: return self._get_more_data(ov, maxsize) - except IOError as e: + except OSError as e: if e.winerror == _winapi.ERROR_BROKEN_PIPE: raise EOFError else: @@ -383,7 +380,7 @@ class Connection(_ConnectionBase): if remaining == size: raise EOFError else: - raise IOError("got end of file during message") + raise OSError("got end of file during message") buf.write(chunk) remaining -= n return buf @@ -443,7 +440,7 @@ class Listener(object): Returns a `Connection` object. ''' if self._listener is None: - raise IOError('listener is closed') + raise OSError('listener is closed') c = self._listener.accept() if self._authkey: deliver_challenge(c, self._authkey) @@ -676,7 +673,7 @@ if sys.platform == 'win32': 0, _winapi.NULL, _winapi.OPEN_EXISTING, _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) - except WindowsError as e: + except OSError as e: if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): raise diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index e31fc61..20ae957 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -4,32 +4,7 @@ # multiprocessing/dummy/__init__.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = [ diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py index 874ec8e..694ef96 100644 --- a/Lib/multiprocessing/dummy/connection.py +++ b/Lib/multiprocessing/dummy/connection.py @@ -4,32 +4,7 @@ # multiprocessing/dummy/connection.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = [ 'Client', 'Listener', 'Pipe' ] diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 0bb21c4..37c9a10 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -7,7 +7,9 @@ # Licensed to PSF under a Contributor Agreement. # +import io import os +import pickle import sys import signal import errno @@ -44,6 +46,15 @@ class ForkingPickler(Pickler): def register(cls, type, reduce): cls._extra_reducers[type] = reduce + @staticmethod + def dumps(obj): + buf = io.BytesIO() + ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) + return buf.getbuffer() + + loads = pickle.loads + + def _reduce_method(m): if m.__self__ is None: return getattr, (m.__class__, m.__func__.__name__) @@ -113,7 +124,7 @@ if sys.platform != 'win32': while True: try: pid, sts = os.waitpid(self.pid, flag) - except os.error as e: + except OSError as e: if e.errno == errno.EINTR: continue # Child process not yet created. See #1731717 @@ -448,27 +459,17 @@ def prepare(data): dirs = [os.path.dirname(main_path)] assert main_name not in sys.modules, main_name + sys.modules.pop('__mp_main__', None) file, path_name, etc = imp.find_module(main_name, dirs) try: - # We would like to do "imp.load_module('__main__', ...)" - # here. However, that would cause 'if __name__ == - # "__main__"' clauses to be executed. + # We should not do 'imp.load_module("__main__", ...)' + # since that would execute 'if __name__ == "__main__"' + # clauses, potentially causing a psuedo fork bomb. main_module = imp.load_module( - '__parents_main__', file, path_name, etc + '__mp_main__', file, path_name, etc ) finally: if file: file.close() - sys.modules['__main__'] = main_module - main_module.__name__ = '__main__' - - # Try to make the potentially picklable objects in - # sys.modules['__main__'] realize they are in the main - # module -- somewhat ugly. - for obj in list(main_module.__dict__.values()): - try: - if obj.__module__ == '__parents_main__': - obj.__module__ = '__main__' - except Exception: - pass + sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 1ab147e..30ef771 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -167,7 +167,7 @@ class Server(object): while True: try: c = self.listener.accept() - except (OSError, IOError): + except OSError: continue t = threading.Thread(target=self.handle_request, args=(c,)) t.daemon = True diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index fc9d904..bcf8a37 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -18,6 +18,7 @@ import queue import itertools import collections import time +import traceback from multiprocessing import Process, cpu_count, TimeoutError from multiprocessing.util import Finalize, debug @@ -43,6 +44,29 @@ def starmapstar(args): return list(itertools.starmap(args[0], args[1])) # +# Hack to embed stringification of remote traceback in local traceback +# + +class RemoteTraceback(Exception): + def __init__(self, tb): + self.tb = tb + def __str__(self): + return self.tb + +class ExceptionWithTraceback: + def __init__(self, exc, tb): + tb = traceback.format_exception(type(exc), exc, tb) + tb = ''.join(tb) + self.exc = exc + self.tb = '\n"""\n%s"""' % tb + def __reduce__(self): + return rebuild_exc, (self.exc, self.tb) + +def rebuild_exc(exc, tb): + exc.__cause__ = RemoteTraceback(tb) + return exc + +# # Code run by worker processes # @@ -78,8 +102,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() - except (EOFError, IOError): - debug('worker got EOFError or IOError -- exiting') + except (EOFError, OSError): + debug('worker got EOFError or OSError -- exiting') break if task is None: @@ -90,6 +114,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): try: result = (True, func(*args, **kwds)) except Exception as e: + e = ExceptionWithTraceback(e, e.__traceback__) result = (False, e) try: put((job, i, result)) @@ -349,7 +374,7 @@ class Pool(object): break try: put(task) - except IOError: + except OSError: debug('could not put task on queue') break else: @@ -371,8 +396,8 @@ class Pool(object): debug('task handler sending sentinel to workers') for p in pool: put(None) - except IOError: - debug('task handler got IOError when sending sentinels') + except OSError: + debug('task handler got OSError when sending sentinels') debug('task handler exiting') @@ -383,8 +408,8 @@ class Pool(object): while 1: try: task = get() - except (IOError, EOFError): - debug('result handler got EOFError/IOError -- exiting') + except (OSError, EOFError): + debug('result handler got EOFError/OSError -- exiting') return if thread._state: @@ -405,8 +430,8 @@ class Pool(object): while cache and thread._state != TERMINATE: try: task = get() - except (IOError, EOFError): - debug('result handler got EOFError/IOError -- exiting') + except (OSError, EOFError): + debug('result handler got EOFError/OSError -- exiting') return if task is None: @@ -428,7 +453,7 @@ class Pool(object): if not outqueue._reader.poll(): break get() - except (IOError, EOFError): + except (OSError, EOFError): pass debug('result handler exiting: len(cache)=%s, thread._state=%s', diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 37271fb..ec188ee 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -22,7 +22,7 @@ import _multiprocessing from multiprocessing.connection import Pipe from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork -from multiprocessing.forking import assert_spawning +from multiprocessing.forking import assert_spawning, ForkingPickler # # Queue type using a pipe, buffer and thread @@ -69,8 +69,8 @@ class Queue(object): self._joincancelled = False self._closed = False self._close = None - self._send = self._writer.send - self._recv = self._reader.recv + self._send_bytes = self._writer.send_bytes + self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): @@ -89,14 +89,9 @@ class Queue(object): def get(self, block=True, timeout=None): if block and timeout is None: - self._rlock.acquire() - try: - res = self._recv() - self._sem.release() - return res - finally: - self._rlock.release() - + with self._rlock: + res = self._recv_bytes() + self._sem.release() else: if block: deadline = time.time() + timeout @@ -109,11 +104,12 @@ class Queue(object): raise Empty elif not self._poll(): raise Empty - res = self._recv() + res = self._recv_bytes() self._sem.release() - return res finally: self._rlock.release() + # unserialize the data after having released the lock + return ForkingPickler.loads(res) def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() @@ -158,7 +154,7 @@ class Queue(object): self._buffer.clear() self._thread = threading.Thread( target=Queue._feed, - args=(self._buffer, self._notempty, self._send, + args=(self._buffer, self._notempty, self._send_bytes, self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) @@ -210,7 +206,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close, ignore_epipe): + def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -241,12 +237,14 @@ class Queue(object): close() return + # serialize the data before acquiring the lock + obj = ForkingPickler.dumps(obj) if wacquire is None: - send(obj) + send_bytes(obj) else: wacquire() try: - send(obj) + send_bytes(obj) finally: wrelease() except IndexError: @@ -340,7 +338,6 @@ class SimpleQueue(object): self._wlock = None else: self._wlock = Lock() - self._make_methods() def empty(self): return not self._poll() @@ -351,29 +348,19 @@ class SimpleQueue(object): def __setstate__(self, state): (self._reader, self._writer, self._rlock, self._wlock) = state - self._make_methods() - def _make_methods(self): - recv = self._reader.recv - racquire, rrelease = self._rlock.acquire, self._rlock.release - def get(): - racquire() - try: - return recv() - finally: - rrelease() - self.get = get + def get(self): + with self._rlock: + res = self._reader.recv_bytes() + # unserialize the data after having released the lock + return ForkingPickler.loads(res) + def put(self, obj): + # serialize the data before acquiring the lock + obj = ForkingPickler.dumps(obj) if self._wlock is None: # writes to a message oriented win32 pipe are atomic - self.put = self._writer.send + self._writer.send_bytes(obj) else: - send = self._writer.send - wacquire, wrelease = self._wlock.acquire, self._wlock.release - def put(obj): - wacquire() - try: - return send(obj) - finally: - wrelease() - self.put = put + with self._wlock: + self._writer.send_bytes(obj) |