summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/managers.py95
-rw-r--r--Lib/multiprocessing/pool.py20
2 files changed, 82 insertions, 33 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index b175470..c4dc972 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -142,7 +142,8 @@ class Server(object):
self.id_to_obj = {'0': (None, ())}
self.id_to_refcount = {}
- self.mutex = threading.RLock()
+ self.id_to_local_proxy_obj = {}
+ self.mutex = threading.Lock()
def serve_forever(self):
'''
@@ -227,7 +228,14 @@ class Server(object):
methodname = obj = None
request = recv()
ident, methodname, args, kwds = request
- obj, exposed, gettypeid = id_to_obj[ident]
+ try:
+ obj, exposed, gettypeid = id_to_obj[ident]
+ except KeyError as ke:
+ try:
+ obj, exposed, gettypeid = \
+ self.id_to_local_proxy_obj[ident]
+ except KeyError as second_ke:
+ raise ke
if methodname not in exposed:
raise AttributeError(
@@ -308,7 +316,7 @@ class Server(object):
'''
with self.mutex:
result = []
- keys = list(self.id_to_obj.keys())
+ keys = list(self.id_to_refcount.keys())
keys.sort()
for ident in keys:
if ident != '0':
@@ -321,7 +329,8 @@ class Server(object):
'''
Number of shared objects
'''
- return len(self.id_to_obj) - 1 # don't count ident='0'
+ # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
+ return len(self.id_to_refcount)
def shutdown(self, c):
'''
@@ -363,13 +372,9 @@ class Server(object):
self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
if ident not in self.id_to_refcount:
self.id_to_refcount[ident] = 0
- # increment the reference count immediately, to avoid
- # this object being garbage collected before a Proxy
- # object for it can be created. The caller of create()
- # is responsible for doing a decref once the Proxy object
- # has been created.
- self.incref(c, ident)
- return ident, tuple(exposed)
+
+ self.incref(c, ident)
+ return ident, tuple(exposed)
def get_methods(self, c, token):
'''
@@ -387,15 +392,45 @@ class Server(object):
def incref(self, c, ident):
with self.mutex:
- self.id_to_refcount[ident] += 1
+ try:
+ self.id_to_refcount[ident] += 1
+ except KeyError as ke:
+ # If no external references exist but an internal (to the
+ # manager) still does and a new external reference is created
+ # from it, restore the manager's tracking of it from the
+ # previously stashed internal ref.
+ if ident in self.id_to_local_proxy_obj:
+ self.id_to_refcount[ident] = 1
+ self.id_to_obj[ident] = \
+ self.id_to_local_proxy_obj[ident]
+ obj, exposed, gettypeid = self.id_to_obj[ident]
+ util.debug('Server re-enabled tracking & INCREF %r', ident)
+ else:
+ raise ke
def decref(self, c, ident):
+ if ident not in self.id_to_refcount and \
+ ident in self.id_to_local_proxy_obj:
+ util.debug('Server DECREF skipping %r', ident)
+ return
+
with self.mutex:
assert self.id_to_refcount[ident] >= 1
self.id_to_refcount[ident] -= 1
if self.id_to_refcount[ident] == 0:
- del self.id_to_obj[ident], self.id_to_refcount[ident]
- util.debug('disposing of obj with id %r', ident)
+ del self.id_to_refcount[ident]
+
+ if ident not in self.id_to_refcount:
+ # Two-step process in case the object turns out to contain other
+ # proxy objects (e.g. a managed list of managed lists).
+ # Otherwise, deleting self.id_to_obj[ident] would trigger the
+ # deleting of the stored value (another managed object) which would
+ # in turn attempt to acquire the mutex that is already held here.
+ self.id_to_obj[ident] = (None, (), None) # thread-safe
+ util.debug('disposing of obj with id %r', ident)
+ with self.mutex:
+ del self.id_to_obj[ident]
+
#
# Class to represent state of a manager
@@ -658,7 +693,7 @@ class BaseProxy(object):
_mutex = util.ForkAwareThreadLock()
def __init__(self, token, serializer, manager=None,
- authkey=None, exposed=None, incref=True):
+ authkey=None, exposed=None, incref=True, manager_owned=False):
with BaseProxy._mutex:
tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None:
@@ -680,6 +715,12 @@ class BaseProxy(object):
self._serializer = serializer
self._Client = listener_client[serializer][1]
+ # Should be set to True only when a proxy object is being created
+ # on the manager server; primary use case: nested proxy objects.
+ # RebuildProxy detects when a proxy is being created on the manager
+ # and sets this value appropriately.
+ self._owned_by_manager = manager_owned
+
if authkey is not None:
self._authkey = process.AuthenticationString(authkey)
elif self._manager is not None:
@@ -738,6 +779,10 @@ class BaseProxy(object):
return self._callmethod('#GETVALUE')
def _incref(self):
+ if self._owned_by_manager:
+ util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
+ return
+
conn = self._Client(self._token.address, authkey=self._authkey)
dispatch(conn, None, 'incref', (self._id,))
util.debug('INCREF %r', self._token.id)
@@ -822,19 +867,19 @@ class BaseProxy(object):
def RebuildProxy(func, token, serializer, kwds):
'''
Function used for unpickling proxy objects.
-
- If possible the shared object is returned, or otherwise a proxy for it.
'''
server = getattr(process.current_process(), '_manager_server', None)
-
if server and server.address == token.address:
- return server.id_to_obj[token.id][0]
- else:
- incref = (
- kwds.pop('incref', True) and
- not getattr(process.current_process(), '_inheriting', False)
- )
- return func(token, serializer, incref=incref, **kwds)
+ util.debug('Rebuild a proxy owned by manager, token=%r', token)
+ kwds['manager_owned'] = True
+ if token.id not in server.id_to_local_proxy_obj:
+ server.id_to_local_proxy_obj[token.id] = \
+ server.id_to_obj[token.id]
+ incref = (
+ kwds.pop('incref', True) and
+ not getattr(process.current_process(), '_inheriting', False)
+ )
+ return func(token, serializer, incref=incref, **kwds)
#
# Functions to create proxies and proxy types
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 6d25469..ffdf426 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -638,22 +638,26 @@ class MapResult(ApplyResult):
self._number_left = length//chunksize + bool(length % chunksize)
def _set(self, i, success_result):
+ self._number_left -= 1
success, result = success_result
- if success:
+ if success and self._success:
self._value[i*self._chunksize:(i+1)*self._chunksize] = result
- self._number_left -= 1
if self._number_left == 0:
if self._callback:
self._callback(self._value)
del self._cache[self._job]
self._event.set()
else:
- self._success = False
- self._value = result
- if self._error_callback:
- self._error_callback(self._value)
- del self._cache[self._job]
- self._event.set()
+ if not success and self._success:
+ # only store first exception
+ self._success = False
+ self._value = result
+ if self._number_left == 0:
+ # only consider the result ready once all jobs are done
+ if self._error_callback:
+ self._error_callback(self._value)
+ del self._cache[self._job]
+ self._event.set()
#
# Class whose instances are returned by `Pool.imap()`