diff options
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/managers.py | 95 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 20 |
2 files changed, 82 insertions, 33 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index b175470..c4dc972 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -142,7 +142,8 @@ class Server(object): self.id_to_obj = {'0': (None, ())} self.id_to_refcount = {} - self.mutex = threading.RLock() + self.id_to_local_proxy_obj = {} + self.mutex = threading.Lock() def serve_forever(self): ''' @@ -227,7 +228,14 @@ class Server(object): methodname = obj = None request = recv() ident, methodname, args, kwds = request - obj, exposed, gettypeid = id_to_obj[ident] + try: + obj, exposed, gettypeid = id_to_obj[ident] + except KeyError as ke: + try: + obj, exposed, gettypeid = \ + self.id_to_local_proxy_obj[ident] + except KeyError as second_ke: + raise ke if methodname not in exposed: raise AttributeError( @@ -308,7 +316,7 @@ class Server(object): ''' with self.mutex: result = [] - keys = list(self.id_to_obj.keys()) + keys = list(self.id_to_refcount.keys()) keys.sort() for ident in keys: if ident != '0': @@ -321,7 +329,8 @@ class Server(object): ''' Number of shared objects ''' - return len(self.id_to_obj) - 1 # don't count ident='0' + # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' + return len(self.id_to_refcount) def shutdown(self, c): ''' @@ -363,13 +372,9 @@ class Server(object): self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) if ident not in self.id_to_refcount: self.id_to_refcount[ident] = 0 - # increment the reference count immediately, to avoid - # this object being garbage collected before a Proxy - # object for it can be created. The caller of create() - # is responsible for doing a decref once the Proxy object - # has been created. - self.incref(c, ident) - return ident, tuple(exposed) + + self.incref(c, ident) + return ident, tuple(exposed) def get_methods(self, c, token): ''' @@ -387,15 +392,45 @@ class Server(object): def incref(self, c, ident): with self.mutex: - self.id_to_refcount[ident] += 1 + try: + self.id_to_refcount[ident] += 1 + except KeyError as ke: + # If no external references exist but an internal (to the + # manager) still does and a new external reference is created + # from it, restore the manager's tracking of it from the + # previously stashed internal ref. + if ident in self.id_to_local_proxy_obj: + self.id_to_refcount[ident] = 1 + self.id_to_obj[ident] = \ + self.id_to_local_proxy_obj[ident] + obj, exposed, gettypeid = self.id_to_obj[ident] + util.debug('Server re-enabled tracking & INCREF %r', ident) + else: + raise ke def decref(self, c, ident): + if ident not in self.id_to_refcount and \ + ident in self.id_to_local_proxy_obj: + util.debug('Server DECREF skipping %r', ident) + return + with self.mutex: assert self.id_to_refcount[ident] >= 1 self.id_to_refcount[ident] -= 1 if self.id_to_refcount[ident] == 0: - del self.id_to_obj[ident], self.id_to_refcount[ident] - util.debug('disposing of obj with id %r', ident) + del self.id_to_refcount[ident] + + if ident not in self.id_to_refcount: + # Two-step process in case the object turns out to contain other + # proxy objects (e.g. a managed list of managed lists). + # Otherwise, deleting self.id_to_obj[ident] would trigger the + # deleting of the stored value (another managed object) which would + # in turn attempt to acquire the mutex that is already held here. + self.id_to_obj[ident] = (None, (), None) # thread-safe + util.debug('disposing of obj with id %r', ident) + with self.mutex: + del self.id_to_obj[ident] + # # Class to represent state of a manager @@ -658,7 +693,7 @@ class BaseProxy(object): _mutex = util.ForkAwareThreadLock() def __init__(self, token, serializer, manager=None, - authkey=None, exposed=None, incref=True): + authkey=None, exposed=None, incref=True, manager_owned=False): with BaseProxy._mutex: tls_idset = BaseProxy._address_to_local.get(token.address, None) if tls_idset is None: @@ -680,6 +715,12 @@ class BaseProxy(object): self._serializer = serializer self._Client = listener_client[serializer][1] + # Should be set to True only when a proxy object is being created + # on the manager server; primary use case: nested proxy objects. + # RebuildProxy detects when a proxy is being created on the manager + # and sets this value appropriately. + self._owned_by_manager = manager_owned + if authkey is not None: self._authkey = process.AuthenticationString(authkey) elif self._manager is not None: @@ -738,6 +779,10 @@ class BaseProxy(object): return self._callmethod('#GETVALUE') def _incref(self): + if self._owned_by_manager: + util.debug('owned_by_manager skipped INCREF of %r', self._token.id) + return + conn = self._Client(self._token.address, authkey=self._authkey) dispatch(conn, None, 'incref', (self._id,)) util.debug('INCREF %r', self._token.id) @@ -822,19 +867,19 @@ class BaseProxy(object): def RebuildProxy(func, token, serializer, kwds): ''' Function used for unpickling proxy objects. - - If possible the shared object is returned, or otherwise a proxy for it. ''' server = getattr(process.current_process(), '_manager_server', None) - if server and server.address == token.address: - return server.id_to_obj[token.id][0] - else: - incref = ( - kwds.pop('incref', True) and - not getattr(process.current_process(), '_inheriting', False) - ) - return func(token, serializer, incref=incref, **kwds) + util.debug('Rebuild a proxy owned by manager, token=%r', token) + kwds['manager_owned'] = True + if token.id not in server.id_to_local_proxy_obj: + server.id_to_local_proxy_obj[token.id] = \ + server.id_to_obj[token.id] + incref = ( + kwds.pop('incref', True) and + not getattr(process.current_process(), '_inheriting', False) + ) + return func(token, serializer, incref=incref, **kwds) # # Functions to create proxies and proxy types diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 6d25469..ffdf426 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -638,22 +638,26 @@ class MapResult(ApplyResult): self._number_left = length//chunksize + bool(length % chunksize) def _set(self, i, success_result): + self._number_left -= 1 success, result = success_result - if success: + if success and self._success: self._value[i*self._chunksize:(i+1)*self._chunksize] = result - self._number_left -= 1 if self._number_left == 0: if self._callback: self._callback(self._value) del self._cache[self._job] self._event.set() else: - self._success = False - self._value = result - if self._error_callback: - self._error_callback(self._value) - del self._cache[self._job] - self._event.set() + if not success and self._success: + # only store first exception + self._success = False + self._value = result + if self._number_left == 0: + # only consider the result ready once all jobs are done + if self._error_callback: + self._error_callback(self._value) + del self._cache[self._job] + self._event.set() # # Class whose instances are returned by `Pool.imap()` |