diff options
-rw-r--r-- | Lib/multiprocessing/forking.py | 63 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 7 | ||||
-rw-r--r-- | Lib/multiprocessing/reduction.py | 15 | ||||
-rw-r--r-- | Lib/multiprocessing/sharedctypes.py | 6 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 33 |
5 files changed, 66 insertions, 58 deletions
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 47d54f2..e1a64df 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -12,7 +12,7 @@ import signal from multiprocessing import util, process -__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close'] +__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler'] # # Check that the current thread is spawning a child process @@ -26,6 +26,50 @@ def assert_spawning(self): ) # +# Try making some callable types picklable +# + +from pickle import _Pickler as Pickler +class ForkingPickler(Pickler): + dispatch = Pickler.dispatch.copy() + @classmethod + def register(cls, type, reduce): + def dispatcher(self, obj): + rv = reduce(obj) + if isinstance(rv, str): + self.save_global(obj, rv) + else: + self.save_reduce(obj=obj, *rv) + cls.dispatch[type] = dispatcher + +def _reduce_method(m): + if m.__self__ is None: + return getattr, (m.__class__, m.__func__.__name__) + else: + return getattr, (m.__self__, m.__func__.__name__) +class _C: + def f(self): + pass +ForkingPickler.register(type(_C().f), _reduce_method) + + +def _reduce_method_descriptor(m): + return getattr, (m.__objclass__, m.__name__) +ForkingPickler.register(type(list.append), _reduce_method_descriptor) +ForkingPickler.register(type(int.__add__), _reduce_method_descriptor) + +try: + from functools import partial +except ImportError: + pass +else: + def _reduce_partial(p): + return _rebuild_partial, (p.func, p.args, p.keywords or {}) + def _rebuild_partial(func, args, keywords): + return partial(func, *args, **keywords) + ForkingPickler.register(partial, _reduce_partial) + +# # Unix # @@ -105,16 +149,18 @@ else: import _thread import msvcrt import _subprocess - import copyreg import time from ._multiprocessing import win32, Connection, PipeConnection from .util import Finalize - try: - from cPickle import dump, load, HIGHEST_PROTOCOL - except ImportError: - from pickle import dump, load, HIGHEST_PROTOCOL + #try: + # from cPickle import dump, load, HIGHEST_PROTOCOL + #except ImportError: + from pickle import load, HIGHEST_PROTOCOL + + def dump(obj, file, protocol=None): + ForkingPickler(file, protocol).dump(obj) # # @@ -346,9 +392,8 @@ else: return type(conn), (Popen.duplicate_for_child(conn.fileno()), conn.readable, conn.writable) - copyreg.pickle(Connection, reduce_connection) - copyreg.pickle(PipeConnection, reduce_connection) - + ForkingPickler.register(Connection, reduce_connection) + ForkingPickler.register(PipeConnection, reduce_connection) # # Prepare current process diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index d6b16e5..d1522c2 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -18,13 +18,12 @@ import sys import weakref import threading import array -import copyreg import queue from traceback import format_exc from multiprocessing import Process, current_process, active_children, Pool, util, connection from multiprocessing.process import AuthenticationString -from multiprocessing.forking import exit, Popen, assert_spawning +from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler from multiprocessing.util import Finalize, info try: @@ -38,14 +37,14 @@ except ImportError: def reduce_array(a): return array.array, (a.typecode, a.tostring()) -copyreg.pickle(array.array, reduce_array) +ForkingPickler.register(array.array, reduce_array) view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] if view_types[0] is not list: # only needed in Py3.0 def rebuild_as_list(obj): return list, (list(obj),) for view_type in view_types: - copyreg.pickle(view_type, rebuild_as_list) + ForkingPickler.register(view_type, rebuild_as_list) # # Type for identifying shared objects diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 010d871..1813729 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -13,11 +13,10 @@ import os import sys import socket import threading -import copyreg import _multiprocessing from multiprocessing import current_process -from multiprocessing.forking import Popen, duplicate, close +from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug from multiprocessing.connection import Client, Listener @@ -134,7 +133,7 @@ def rebuild_handle(pickled_data): return new_handle # -# Register `_multiprocessing.Connection` with `copy_reg` +# Register `_multiprocessing.Connection` with `ForkingPickler` # def reduce_connection(conn): @@ -147,10 +146,10 @@ def rebuild_connection(reduced_handle, readable, writable): handle, readable=readable, writable=writable ) -copyreg.pickle(_multiprocessing.Connection, reduce_connection) +ForkingPickler.register(_multiprocessing.Connection, reduce_connection) # -# Register `socket.socket` with `copy_reg` +# Register `socket.socket` with `ForkingPickler` # def fromfd(fd, family, type_, proto=0): @@ -169,10 +168,10 @@ def rebuild_socket(reduced_handle, family, type_, proto): close(fd) return _sock -copyreg.pickle(socket.socket, reduce_socket) +ForkingPickler.register(socket.socket, reduce_socket) # -# Register `_multiprocessing.PipeConnection` with `copy_reg` +# Register `_multiprocessing.PipeConnection` with `ForkingPickler` # if sys.platform == 'win32': @@ -187,4 +186,4 @@ if sys.platform == 'win32': handle, readable=readable, writable=writable ) - copyreg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection) + ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index 45826dd..b94cd52 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -9,10 +9,9 @@ import sys import ctypes import weakref -import copyreg from multiprocessing import heap, RLock -from multiprocessing.forking import assert_spawning +from multiprocessing.forking import assert_spawning, ForkingPickler __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized'] @@ -124,8 +123,7 @@ def reduce_ctype(obj): def rebuild_ctype(type_, wrapper, length): if length is not None: type_ = type_ * length - if sys.platform == 'win32' and type_ not in copyreg.dispatch_table: - copyreg.pickle(type_, reduce_ctype) + ForkingPickler.register(type_, reduce_ctype) obj = type_.from_address(wrapper.get_address()) obj._wrapper = wrapper return obj diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index aae38c7..3fe0175 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -8,7 +8,6 @@ import itertools import weakref -import copyreg import atexit import threading # we want threading to install it's # cleanup function before multiprocessing does @@ -302,35 +301,3 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () - -# -# Try making some callable types picklable -# - -def _reduce_method(m): - if m.__self__ is None: - return getattr, (m.__self__.__class__, m.__func__.__name__) - else: - return getattr, (m.__self__, m.__func__.__name__) -copyreg.pickle(type(Finalize.__init__), _reduce_method) - -def _reduce_method_descriptor(m): - return getattr, (m.__objclass__, m.__name__) -copyreg.pickle(type(list.append), _reduce_method_descriptor) -copyreg.pickle(type(int.__add__), _reduce_method_descriptor) - -def _reduce_builtin_function_or_method(m): - return getattr, (m.__self__, m.__name__) -copyreg.pickle(type(list().append), _reduce_builtin_function_or_method) -copyreg.pickle(type(int().__add__), _reduce_builtin_function_or_method) - -try: - from functools import partial -except ImportError: - pass -else: - def _reduce_partial(p): - return _rebuild_partial, (p.func, p.args, p.keywords or {}) - def _rebuild_partial(func, args, keywords): - return partial(func, *args, **keywords) - copyreg.pickle(partial, _reduce_partial) |