diff options
author | Davin Potts <python@discontinuity.net> | 2016-09-07 23:48:01 (GMT) |
---|---|---|
committer | Davin Potts <python@discontinuity.net> | 2016-09-07 23:48:01 (GMT) |
commit | 86a76684269f940a20366cb42668f1acb0982dca (patch) | |
tree | cb1e60312e2a1626fff00bda42e4163a549ba77f | |
parent | 1aa642f6bd8e7f6315721201165efa873e77259b (diff) | |
download | cpython-86a76684269f940a20366cb42668f1acb0982dca.zip cpython-86a76684269f940a20366cb42668f1acb0982dca.tar.gz cpython-86a76684269f940a20366cb42668f1acb0982dca.tar.bz2 |
Fixes issue #6766: Updated multiprocessing Proxy Objects to support nesting
-rw-r--r-- | Doc/library/multiprocessing.rst | 83 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 95 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 70 |
3 files changed, 192 insertions, 56 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index f886ecb..1813eeb 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -1682,7 +1682,9 @@ their parent process exits. The manager classes are defined in the of processes. Objects of this type are returned by :func:`multiprocessing.Manager`. - It also supports creation of shared lists and dictionaries. + Its methods create and return :ref:`multiprocessing-proxy_objects` for a + number of commonly used data types to be synchronized across processes. + This notably includes shared lists and dictionaries. .. method:: Barrier(parties[, action[, timeout]]) @@ -1745,31 +1747,17 @@ their parent process exits. The manager classes are defined in the dict(mapping) dict(sequence) - Create a shared ``dict`` object and return a proxy for it. + Create a shared :class:`dict` object and return a proxy for it. .. method:: list() list(sequence) - Create a shared ``list`` object and return a proxy for it. - - .. note:: - - Modifications to mutable values or items in dict and list proxies will not - be propagated through the manager, because the proxy has no way of knowing - when its values or items are modified. To modify such an item, you can - re-assign the modified object to the container proxy:: - - # create a list proxy and append a mutable object (a dictionary) - lproxy = manager.list() - lproxy.append({}) - # now mutate the dictionary - d = lproxy[0] - d['a'] = 1 - d['b'] = 2 - # at this point, the changes to d are not yet synced, but by - # reassigning the dictionary, the proxy is notified of the change - lproxy[0] = d + Create a shared :class:`list` object and return a proxy for it. + .. versionchanged:: 3.6 + Shared objects are capable of being nested. For example, a shared + container object such as a shared list can contain other shared objects + which will all be managed and synchronized by the :class:`SyncManager`. .. class:: Namespace @@ -1881,6 +1869,8 @@ client to access it remotely:: >>> s = m.get_server() >>> s.serve_forever() +.. _multiprocessing-proxy_objects: + Proxy Objects ~~~~~~~~~~~~~ @@ -1890,8 +1880,7 @@ proxy. Multiple proxy objects may have the same referent. A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through -the proxy). A proxy can usually be used in most of the same ways that its -referent can: +the proxy). In this way, a proxy can be used just like its referent can: .. doctest:: @@ -1912,9 +1901,9 @@ the referent, whereas applying :func:`repr` will return the representation of the proxy. An important feature of proxy objects is that they are picklable so they can be -passed between processes. Note, however, that if a proxy is sent to the -corresponding manager's process then unpickling it will produce the referent -itself. This means, for example, that one shared object can contain a second: +passed between processes. As such, a referent can contain +:ref:`multiprocessing-proxy_objects`. This permits nesting of these managed +lists, dicts, and other :ref:`multiprocessing-proxy_objects`: .. doctest:: @@ -1922,10 +1911,46 @@ itself. This means, for example, that one shared object can contain a second: >>> b = manager.list() >>> a.append(b) # referent of a now contains referent of b >>> print(a, b) - [[]] [] + [<ListProxy object, typeid 'list' at ...>] [] >>> b.append('hello') - >>> print(a, b) - [['hello']] ['hello'] + >>> print(a[0], b) + ['hello'] ['hello'] + +Similarly, dict and list proxies may be nested inside one another:: + + >>> l_outer = manager.list([ manager.dict() for i in range(2) ]) + >>> d_first_inner = l_outer[0] + >>> d_first_inner['a'] = 1 + >>> d_first_inner['b'] = 2 + >>> l_outer[1]['c'] = 3 + >>> l_outer[1]['z'] = 26 + >>> print(l_outer[0]) + {'a': 1, 'b': 2} + >>> print(l_outer[1]) + {'c': 3, 'z': 26} + +If standard (non-proxy) :class:`list` or :class:`dict` objects are contained +in a referent, modifications to those mutable values will not be propagated +through the manager because the proxy has no way of knowing when the values +contained within are modified. However, storing a value in a container proxy +(which triggers a ``__setitem__`` on the proxy object) does propagate through +the manager and so to effectively modify such an item, one could re-assign the +modified value to the container proxy:: + + # create a list proxy and append a mutable object (a dictionary) + lproxy = manager.list() + lproxy.append({}) + # now mutate the dictionary + d = lproxy[0] + d['a'] = 1 + d['b'] = 2 + # at this point, the changes to d are not yet synced, but by + # updating the dictionary, the proxy is notified of the change + lproxy[0] = d + +This approach is perhaps less convenient than employing nested +:ref:`multiprocessing-proxy_objects` for most use cases but also +demonstrates a level of control over the synchronization. .. note:: diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index c559b55..6e63a60 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -142,7 +142,8 @@ class Server(object): self.id_to_obj = {'0': (None, ())} self.id_to_refcount = {} - self.mutex = threading.RLock() + self.id_to_local_proxy_obj = {} + self.mutex = threading.Lock() def serve_forever(self): ''' @@ -227,7 +228,14 @@ class Server(object): methodname = obj = None request = recv() ident, methodname, args, kwds = request - obj, exposed, gettypeid = id_to_obj[ident] + try: + obj, exposed, gettypeid = id_to_obj[ident] + except KeyError as ke: + try: + obj, exposed, gettypeid = \ + self.id_to_local_proxy_obj[ident] + except KeyError as second_ke: + raise ke if methodname not in exposed: raise AttributeError( @@ -308,7 +316,7 @@ class Server(object): ''' with self.mutex: result = [] - keys = list(self.id_to_obj.keys()) + keys = list(self.id_to_refcount.keys()) keys.sort() for ident in keys: if ident != '0': @@ -321,7 +329,8 @@ class Server(object): ''' Number of shared objects ''' - return len(self.id_to_obj) - 1 # don't count ident='0' + # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' + return len(self.id_to_refcount) def shutdown(self, c): ''' @@ -363,13 +372,9 @@ class Server(object): self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) if ident not in self.id_to_refcount: self.id_to_refcount[ident] = 0 - # increment the reference count immediately, to avoid - # this object being garbage collected before a Proxy - # object for it can be created. The caller of create() - # is responsible for doing a decref once the Proxy object - # has been created. - self.incref(c, ident) - return ident, tuple(exposed) + + self.incref(c, ident) + return ident, tuple(exposed) def get_methods(self, c, token): ''' @@ -387,15 +392,45 @@ class Server(object): def incref(self, c, ident): with self.mutex: - self.id_to_refcount[ident] += 1 + try: + self.id_to_refcount[ident] += 1 + except KeyError as ke: + # If no external references exist but an internal (to the + # manager) still does and a new external reference is created + # from it, restore the manager's tracking of it from the + # previously stashed internal ref. + if ident in self.id_to_local_proxy_obj: + self.id_to_refcount[ident] = 1 + self.id_to_obj[ident] = \ + self.id_to_local_proxy_obj[ident] + obj, exposed, gettypeid = self.id_to_obj[ident] + util.debug('Server re-enabled tracking & INCREF %r', ident) + else: + raise ke def decref(self, c, ident): + if ident not in self.id_to_refcount and \ + ident in self.id_to_local_proxy_obj: + util.debug('Server DECREF skipping %r', ident) + return + with self.mutex: assert self.id_to_refcount[ident] >= 1 self.id_to_refcount[ident] -= 1 if self.id_to_refcount[ident] == 0: - del self.id_to_obj[ident], self.id_to_refcount[ident] - util.debug('disposing of obj with id %r', ident) + del self.id_to_refcount[ident] + + if ident not in self.id_to_refcount: + # Two-step process in case the object turns out to contain other + # proxy objects (e.g. a managed list of managed lists). + # Otherwise, deleting self.id_to_obj[ident] would trigger the + # deleting of the stored value (another managed object) which would + # in turn attempt to acquire the mutex that is already held here. + self.id_to_obj[ident] = (None, (), None) # thread-safe + util.debug('disposing of obj with id %r', ident) + with self.mutex: + del self.id_to_obj[ident] + # # Class to represent state of a manager @@ -658,7 +693,7 @@ class BaseProxy(object): _mutex = util.ForkAwareThreadLock() def __init__(self, token, serializer, manager=None, - authkey=None, exposed=None, incref=True): + authkey=None, exposed=None, incref=True, manager_owned=False): with BaseProxy._mutex: tls_idset = BaseProxy._address_to_local.get(token.address, None) if tls_idset is None: @@ -680,6 +715,12 @@ class BaseProxy(object): self._serializer = serializer self._Client = listener_client[serializer][1] + # Should be set to True only when a proxy object is being created + # on the manager server; primary use case: nested proxy objects. + # RebuildProxy detects when a proxy is being created on the manager + # and sets this value appropriately. + self._owned_by_manager = manager_owned + if authkey is not None: self._authkey = process.AuthenticationString(authkey) elif self._manager is not None: @@ -738,6 +779,10 @@ class BaseProxy(object): return self._callmethod('#GETVALUE') def _incref(self): + if self._owned_by_manager: + util.debug('owned_by_manager skipped INCREF of %r', self._token.id) + return + conn = self._Client(self._token.address, authkey=self._authkey) dispatch(conn, None, 'incref', (self._id,)) util.debug('INCREF %r', self._token.id) @@ -822,19 +867,19 @@ class BaseProxy(object): def RebuildProxy(func, token, serializer, kwds): ''' Function used for unpickling proxy objects. - - If possible the shared object is returned, or otherwise a proxy for it. ''' server = getattr(process.current_process(), '_manager_server', None) - if server and server.address == token.address: - return server.id_to_obj[token.id][0] - else: - incref = ( - kwds.pop('incref', True) and - not getattr(process.current_process(), '_inheriting', False) - ) - return func(token, serializer, incref=incref, **kwds) + util.debug('Rebuild a proxy owned by manager, token=%r', token) + kwds['manager_owned'] = True + if token.id not in server.id_to_local_proxy_obj: + server.id_to_local_proxy_obj[token.id] = \ + server.id_to_obj[token.id] + incref = ( + kwds.pop('incref', True) and + not getattr(process.current_process(), '_inheriting', False) + ) + return func(token, serializer, incref=incref, **kwds) # # Functions to create proxies and proxy types diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index cfd801e..d88cd07 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1628,13 +1628,33 @@ class _TestContainers(BaseTestCase): d = [a, b] e = self.list(d) self.assertEqual( - e[:], + [element[:] for element in e], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] ) f = self.list([a]) a.append('hello') - self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) + self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) + + def test_list_proxy_in_list(self): + a = self.list([self.list(range(3)) for _i in range(3)]) + self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) + + a[0][-1] = 55 + self.assertEqual(a[0][:], [0, 1, 55]) + for i in range(1, 3): + self.assertEqual(a[i][:], [0, 1, 2]) + + self.assertEqual(a[1].pop(), 2) + self.assertEqual(len(a[1]), 2) + for i in range(0, 3, 2): + self.assertEqual(len(a[i]), 3) + + del a + + b = self.list() + b.append(b) + del b def test_dict(self): d = self.dict() @@ -1646,6 +1666,52 @@ class _TestContainers(BaseTestCase): self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) + def test_dict_proxy_nested(self): + pets = self.dict(ferrets=2, hamsters=4) + supplies = self.dict(water=10, feed=3) + d = self.dict(pets=pets, supplies=supplies) + + self.assertEqual(supplies['water'], 10) + self.assertEqual(d['supplies']['water'], 10) + + d['supplies']['blankets'] = 5 + self.assertEqual(supplies['blankets'], 5) + self.assertEqual(d['supplies']['blankets'], 5) + + d['supplies']['water'] = 7 + self.assertEqual(supplies['water'], 7) + self.assertEqual(d['supplies']['water'], 7) + + del pets + del supplies + self.assertEqual(d['pets']['ferrets'], 2) + d['supplies']['blankets'] = 11 + self.assertEqual(d['supplies']['blankets'], 11) + + pets = d['pets'] + supplies = d['supplies'] + supplies['water'] = 7 + self.assertEqual(supplies['water'], 7) + self.assertEqual(d['supplies']['water'], 7) + + d.clear() + self.assertEqual(len(d), 0) + self.assertEqual(supplies['water'], 7) + self.assertEqual(pets['hamsters'], 4) + + l = self.list([pets, supplies]) + l[0]['marmots'] = 1 + self.assertEqual(pets['marmots'], 1) + self.assertEqual(l[0]['marmots'], 1) + + del pets + del supplies + self.assertEqual(l[0]['marmots'], 1) + + outer = self.list([[88, 99], l]) + self.assertIsInstance(outer[0], list) # Not a ListProxy + self.assertEqual(outer[-1][-1]['feed'], 3) + def test_namespace(self): n = self.Namespace() n.name = 'Bob' |