diff options
Diffstat (limited to 'Lib/multiprocessing/managers.py')
-rw-r--r-- | Lib/multiprocessing/managers.py | 62 |
1 files changed, 36 insertions, 26 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 96056b0..66d46fc 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -19,11 +19,16 @@ import threading import array import queue -from traceback import format_exc -from multiprocessing import Process, current_process, active_children, Pool, util, connection -from multiprocessing.process import AuthenticationString -from multiprocessing.forking import Popen, ForkingPickler from time import time as _time +from traceback import format_exc + +from . import connection +from . import context +from . import pool +from . import process +from . import reduction +from . import util +from . import get_context # # Register some things for pickling @@ -31,16 +36,14 @@ from time import time as _time def reduce_array(a): return array.array, (a.typecode, a.tobytes()) -ForkingPickler.register(array.array, reduce_array) +reduction.register(array.array, reduce_array) view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] if view_types[0] is not list: # only needed in Py3.0 def rebuild_as_list(obj): return list, (list(obj),) for view_type in view_types: - ForkingPickler.register(view_type, rebuild_as_list) - import copyreg - copyreg.pickle(view_type, rebuild_as_list) + reduction.register(view_type, rebuild_as_list) # # Type for identifying shared objects @@ -130,7 +133,7 @@ class Server(object): def __init__(self, registry, address, authkey, serializer): assert isinstance(authkey, bytes) self.registry = registry - self.authkey = AuthenticationString(authkey) + self.authkey = process.AuthenticationString(authkey) Listener, Client = listener_client[serializer] # do authentication later @@ -146,7 +149,7 @@ class Server(object): Run the server forever ''' self.stop_event = threading.Event() - current_process()._manager_server = self + process.current_process()._manager_server = self try: accepter = threading.Thread(target=self.accepter) accepter.daemon = True @@ -167,7 +170,7 @@ class Server(object): while True: try: c = self.listener.accept() - except (OSError, IOError): + except OSError: continue t = threading.Thread(target=self.handle_request, args=(c,)) t.daemon = True @@ -436,15 +439,17 @@ class BaseManager(object): _registry = {} _Server = Server - def __init__(self, address=None, authkey=None, serializer='pickle'): + def __init__(self, address=None, authkey=None, serializer='pickle', + ctx=None): if authkey is None: - authkey = current_process().authkey + authkey = process.current_process().authkey self._address = address # XXX not final address if eg ('', 0) - self._authkey = AuthenticationString(authkey) + self._authkey = process.AuthenticationString(authkey) self._state = State() self._state.value = State.INITIAL self._serializer = serializer self._Listener, self._Client = listener_client[serializer] + self._ctx = ctx or get_context() def get_server(self): ''' @@ -476,7 +481,7 @@ class BaseManager(object): reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server - self._process = Process( + self._process = self._ctx.Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, self._serializer, writer, initializer, initargs), @@ -691,11 +696,11 @@ class BaseProxy(object): self._Client = listener_client[serializer][1] if authkey is not None: - self._authkey = AuthenticationString(authkey) + self._authkey = process.AuthenticationString(authkey) elif self._manager is not None: self._authkey = self._manager._authkey else: - self._authkey = current_process().authkey + self._authkey = process.current_process().authkey if incref: self._incref() @@ -704,7 +709,7 @@ class BaseProxy(object): def _connect(self): util.debug('making connection to manager') - name = current_process().name + name = process.current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name conn = self._Client(self._token.address, authkey=self._authkey) @@ -798,7 +803,7 @@ class BaseProxy(object): def __reduce__(self): kwds = {} - if Popen.thread_is_spawning(): + if context.get_spawning_popen() is not None: kwds['authkey'] = self._authkey if getattr(self, '_isauto', False): @@ -835,14 +840,14 @@ def RebuildProxy(func, token, serializer, kwds): If possible the shared object is returned, or otherwise a proxy for it. ''' - server = getattr(current_process(), '_manager_server', None) + server = getattr(process.current_process(), '_manager_server', None) if server and server.address == token.address: return server.id_to_obj[token.id][0] else: incref = ( kwds.pop('incref', True) and - not getattr(current_process(), '_inheriting', False) + not getattr(process.current_process(), '_inheriting', False) ) return func(token, serializer, incref=incref, **kwds) @@ -889,7 +894,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None, if authkey is None and manager is not None: authkey = manager._authkey if authkey is None: - authkey = current_process().authkey + authkey = process.current_process().authkey ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, @@ -1072,17 +1077,22 @@ ArrayProxy = MakeProxyType('ArrayProxy', ( )) -PoolProxy = MakeProxyType('PoolProxy', ( +BasePoolProxy = MakeProxyType('PoolProxy', ( 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', - 'map', 'map_async', 'starmap', 'starmap_async', 'terminate' + 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', )) -PoolProxy._method_to_typeid_ = { +BasePoolProxy._method_to_typeid_ = { 'apply_async': 'AsyncResult', 'map_async': 'AsyncResult', 'starmap_async': 'AsyncResult', 'imap': 'Iterator', 'imap_unordered': 'Iterator' } +class PoolProxy(BasePoolProxy): + def __enter__(self): + return self + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() # # Definition of SyncManager @@ -1109,7 +1119,7 @@ SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, AcquirerProxy) SyncManager.register('Condition', threading.Condition, ConditionProxy) SyncManager.register('Barrier', threading.Barrier, BarrierProxy) -SyncManager.register('Pool', Pool, PoolProxy) +SyncManager.register('Pool', pool.Pool, PoolProxy) SyncManager.register('list', list, ListProxy) SyncManager.register('dict', dict, DictProxy) SyncManager.register('Value', Value, ValueProxy) |