diff options
author | Richard Oudkerk <shibturn@gmail.com> | 2013-08-14 14:35:41 (GMT) |
---|---|---|
committer | Richard Oudkerk <shibturn@gmail.com> | 2013-08-14 14:35:41 (GMT) |
commit | 84ed9a68bd9a13252b376b21a9167dabae254325 (patch) | |
tree | ec8daa39fcf64b658bddf52f56ae47c0bdc2b091 /Lib/multiprocessing/managers.py | |
parent | d06eeb4a2492b59d34ab69a2046dcae1f10ec593 (diff) | |
download | cpython-84ed9a68bd9a13252b376b21a9167dabae254325.zip cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.gz cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.bz2 |
Issue #8713: Support alternative start methods in multiprocessing on Unix.
See http://hg.python.org/sandbox/sbt#spawn
Diffstat (limited to 'Lib/multiprocessing/managers.py')
-rw-r--r-- | Lib/multiprocessing/managers.py | 44 |
1 files changed, 23 insertions, 21 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 36cd650..f580e9e 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -19,11 +19,15 @@ import threading import array import queue -from traceback import format_exc -from multiprocessing import Process, current_process, active_children, Pool, util, connection -from multiprocessing.process import AuthenticationString -from multiprocessing.forking import Popen, ForkingPickler from time import time as _time +from traceback import format_exc + +from . import connection +from . import pool +from . import process +from . import popen +from . import reduction +from . import util # # Register some things for pickling @@ -31,16 +35,14 @@ from time import time as _time def reduce_array(a): return array.array, (a.typecode, a.tobytes()) -ForkingPickler.register(array.array, reduce_array) +reduction.register(array.array, reduce_array) view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] if view_types[0] is not list: # only needed in Py3.0 def rebuild_as_list(obj): return list, (list(obj),) for view_type in view_types: - ForkingPickler.register(view_type, rebuild_as_list) - import copyreg - copyreg.pickle(view_type, rebuild_as_list) + reduction.register(view_type, rebuild_as_list) # # Type for identifying shared objects @@ -130,7 +132,7 @@ class Server(object): def __init__(self, registry, address, authkey, serializer): assert isinstance(authkey, bytes) self.registry = registry - self.authkey = AuthenticationString(authkey) + self.authkey = process.AuthenticationString(authkey) Listener, Client = listener_client[serializer] # do authentication later @@ -146,7 +148,7 @@ class Server(object): Run the server forever ''' self.stop_event = threading.Event() - current_process()._manager_server = self + process.current_process()._manager_server = self try: accepter = threading.Thread(target=self.accepter) accepter.daemon = True @@ -438,9 +440,9 @@ class BaseManager(object): def __init__(self, address=None, authkey=None, serializer='pickle'): if authkey is None: - authkey = current_process().authkey + authkey = process.current_process().authkey self._address = address # XXX not final address if eg ('', 0) - self._authkey = AuthenticationString(authkey) + self._authkey = process.AuthenticationString(authkey) self._state = State() self._state.value = State.INITIAL self._serializer = serializer @@ -476,7 +478,7 @@ class BaseManager(object): reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server - self._process = Process( + self._process = process.Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, self._serializer, writer, initializer, initargs), @@ -691,11 +693,11 @@ class BaseProxy(object): self._Client = listener_client[serializer][1] if authkey is not None: - self._authkey = AuthenticationString(authkey) + self._authkey = process.AuthenticationString(authkey) elif self._manager is not None: self._authkey = self._manager._authkey else: - self._authkey = current_process().authkey + self._authkey = process.current_process().authkey if incref: self._incref() @@ -704,7 +706,7 @@ class BaseProxy(object): def _connect(self): util.debug('making connection to manager') - name = current_process().name + name = process.current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name conn = self._Client(self._token.address, authkey=self._authkey) @@ -798,7 +800,7 @@ class BaseProxy(object): def __reduce__(self): kwds = {} - if Popen.thread_is_spawning(): + if popen.get_spawning_popen() is not None: kwds['authkey'] = self._authkey if getattr(self, '_isauto', False): @@ -835,14 +837,14 @@ def RebuildProxy(func, token, serializer, kwds): If possible the shared object is returned, or otherwise a proxy for it. ''' - server = getattr(current_process(), '_manager_server', None) + server = getattr(process.current_process(), '_manager_server', None) if server and server.address == token.address: return server.id_to_obj[token.id][0] else: incref = ( kwds.pop('incref', True) and - not getattr(current_process(), '_inheriting', False) + not getattr(process.current_process(), '_inheriting', False) ) return func(token, serializer, incref=incref, **kwds) @@ -889,7 +891,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None, if authkey is None and manager is not None: authkey = manager._authkey if authkey is None: - authkey = current_process().authkey + authkey = process.current_process().authkey ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, @@ -1109,7 +1111,7 @@ SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, AcquirerProxy) SyncManager.register('Condition', threading.Condition, ConditionProxy) SyncManager.register('Barrier', threading.Barrier, BarrierProxy) -SyncManager.register('Pool', Pool, PoolProxy) +SyncManager.register('Pool', pool.Pool, PoolProxy) SyncManager.register('list', list, ListProxy) SyncManager.register('dict', dict, DictProxy) SyncManager.register('Value', Value, ValueProxy) |