summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavin Potts <python@discontinuity.net>2016-09-07 23:48:01 (GMT)
committerDavin Potts <python@discontinuity.net>2016-09-07 23:48:01 (GMT)
commit86a76684269f940a20366cb42668f1acb0982dca (patch)
treecb1e60312e2a1626fff00bda42e4163a549ba77f
parent1aa642f6bd8e7f6315721201165efa873e77259b (diff)
downloadcpython-86a76684269f940a20366cb42668f1acb0982dca.zip
cpython-86a76684269f940a20366cb42668f1acb0982dca.tar.gz
cpython-86a76684269f940a20366cb42668f1acb0982dca.tar.bz2
Fixes issue #6766: Updated multiprocessing Proxy Objects to support nesting
-rw-r--r--Doc/library/multiprocessing.rst83
-rw-r--r--Lib/multiprocessing/managers.py95
-rw-r--r--Lib/test/_test_multiprocessing.py70
3 files changed, 192 insertions, 56 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
index f886ecb..1813eeb 100644
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -1682,7 +1682,9 @@ their parent process exits. The manager classes are defined in the
of processes. Objects of this type are returned by
:func:`multiprocessing.Manager`.
- It also supports creation of shared lists and dictionaries.
+ Its methods create and return :ref:`multiprocessing-proxy_objects` for a
+ number of commonly used data types to be synchronized across processes.
+ This notably includes shared lists and dictionaries.
.. method:: Barrier(parties[, action[, timeout]])
@@ -1745,31 +1747,17 @@ their parent process exits. The manager classes are defined in the
dict(mapping)
dict(sequence)
- Create a shared ``dict`` object and return a proxy for it.
+ Create a shared :class:`dict` object and return a proxy for it.
.. method:: list()
list(sequence)
- Create a shared ``list`` object and return a proxy for it.
-
- .. note::
-
- Modifications to mutable values or items in dict and list proxies will not
- be propagated through the manager, because the proxy has no way of knowing
- when its values or items are modified. To modify such an item, you can
- re-assign the modified object to the container proxy::
-
- # create a list proxy and append a mutable object (a dictionary)
- lproxy = manager.list()
- lproxy.append({})
- # now mutate the dictionary
- d = lproxy[0]
- d['a'] = 1
- d['b'] = 2
- # at this point, the changes to d are not yet synced, but by
- # reassigning the dictionary, the proxy is notified of the change
- lproxy[0] = d
+ Create a shared :class:`list` object and return a proxy for it.
+ .. versionchanged:: 3.6
+ Shared objects are capable of being nested. For example, a shared
+ container object such as a shared list can contain other shared objects
+ which will all be managed and synchronized by the :class:`SyncManager`.
.. class:: Namespace
@@ -1881,6 +1869,8 @@ client to access it remotely::
>>> s = m.get_server()
>>> s.serve_forever()
+.. _multiprocessing-proxy_objects:
+
Proxy Objects
~~~~~~~~~~~~~
@@ -1890,8 +1880,7 @@ proxy. Multiple proxy objects may have the same referent.
A proxy object has methods which invoke corresponding methods of its referent
(although not every method of the referent will necessarily be available through
-the proxy). A proxy can usually be used in most of the same ways that its
-referent can:
+the proxy). In this way, a proxy can be used just like its referent can:
.. doctest::
@@ -1912,9 +1901,9 @@ the referent, whereas applying :func:`repr` will return the representation of
the proxy.
An important feature of proxy objects is that they are picklable so they can be
-passed between processes. Note, however, that if a proxy is sent to the
-corresponding manager's process then unpickling it will produce the referent
-itself. This means, for example, that one shared object can contain a second:
+passed between processes. As such, a referent can contain
+:ref:`multiprocessing-proxy_objects`. This permits nesting of these managed
+lists, dicts, and other :ref:`multiprocessing-proxy_objects`:
.. doctest::
@@ -1922,10 +1911,46 @@ itself. This means, for example, that one shared object can contain a second:
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
- [[]] []
+ [<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
- >>> print(a, b)
- [['hello']] ['hello']
+ >>> print(a[0], b)
+ ['hello'] ['hello']
+
+Similarly, dict and list proxies may be nested inside one another::
+
+ >>> l_outer = manager.list([ manager.dict() for i in range(2) ])
+ >>> d_first_inner = l_outer[0]
+ >>> d_first_inner['a'] = 1
+ >>> d_first_inner['b'] = 2
+ >>> l_outer[1]['c'] = 3
+ >>> l_outer[1]['z'] = 26
+ >>> print(l_outer[0])
+ {'a': 1, 'b': 2}
+ >>> print(l_outer[1])
+ {'c': 3, 'z': 26}
+
+If standard (non-proxy) :class:`list` or :class:`dict` objects are contained
+in a referent, modifications to those mutable values will not be propagated
+through the manager because the proxy has no way of knowing when the values
+contained within are modified. However, storing a value in a container proxy
+(which triggers a ``__setitem__`` on the proxy object) does propagate through
+the manager and so to effectively modify such an item, one could re-assign the
+modified value to the container proxy::
+
+ # create a list proxy and append a mutable object (a dictionary)
+ lproxy = manager.list()
+ lproxy.append({})
+ # now mutate the dictionary
+ d = lproxy[0]
+ d['a'] = 1
+ d['b'] = 2
+ # at this point, the changes to d are not yet synced, but by
+ # updating the dictionary, the proxy is notified of the change
+ lproxy[0] = d
+
+This approach is perhaps less convenient than employing nested
+:ref:`multiprocessing-proxy_objects` for most use cases but also
+demonstrates a level of control over the synchronization.
.. note::
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index c559b55..6e63a60 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -142,7 +142,8 @@ class Server(object):
self.id_to_obj = {'0': (None, ())}
self.id_to_refcount = {}
- self.mutex = threading.RLock()
+ self.id_to_local_proxy_obj = {}
+ self.mutex = threading.Lock()
def serve_forever(self):
'''
@@ -227,7 +228,14 @@ class Server(object):
methodname = obj = None
request = recv()
ident, methodname, args, kwds = request
- obj, exposed, gettypeid = id_to_obj[ident]
+ try:
+ obj, exposed, gettypeid = id_to_obj[ident]
+ except KeyError as ke:
+ try:
+ obj, exposed, gettypeid = \
+ self.id_to_local_proxy_obj[ident]
+ except KeyError as second_ke:
+ raise ke
if methodname not in exposed:
raise AttributeError(
@@ -308,7 +316,7 @@ class Server(object):
'''
with self.mutex:
result = []
- keys = list(self.id_to_obj.keys())
+ keys = list(self.id_to_refcount.keys())
keys.sort()
for ident in keys:
if ident != '0':
@@ -321,7 +329,8 @@ class Server(object):
'''
Number of shared objects
'''
- return len(self.id_to_obj) - 1 # don't count ident='0'
+ # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
+ return len(self.id_to_refcount)
def shutdown(self, c):
'''
@@ -363,13 +372,9 @@ class Server(object):
self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
if ident not in self.id_to_refcount:
self.id_to_refcount[ident] = 0
- # increment the reference count immediately, to avoid
- # this object being garbage collected before a Proxy
- # object for it can be created. The caller of create()
- # is responsible for doing a decref once the Proxy object
- # has been created.
- self.incref(c, ident)
- return ident, tuple(exposed)
+
+ self.incref(c, ident)
+ return ident, tuple(exposed)
def get_methods(self, c, token):
'''
@@ -387,15 +392,45 @@ class Server(object):
def incref(self, c, ident):
with self.mutex:
- self.id_to_refcount[ident] += 1
+ try:
+ self.id_to_refcount[ident] += 1
+ except KeyError as ke:
+ # If no external references exist but an internal (to the
+ # manager) still does and a new external reference is created
+ # from it, restore the manager's tracking of it from the
+ # previously stashed internal ref.
+ if ident in self.id_to_local_proxy_obj:
+ self.id_to_refcount[ident] = 1
+ self.id_to_obj[ident] = \
+ self.id_to_local_proxy_obj[ident]
+ obj, exposed, gettypeid = self.id_to_obj[ident]
+ util.debug('Server re-enabled tracking & INCREF %r', ident)
+ else:
+ raise ke
def decref(self, c, ident):
+ if ident not in self.id_to_refcount and \
+ ident in self.id_to_local_proxy_obj:
+ util.debug('Server DECREF skipping %r', ident)
+ return
+
with self.mutex:
assert self.id_to_refcount[ident] >= 1
self.id_to_refcount[ident] -= 1
if self.id_to_refcount[ident] == 0:
- del self.id_to_obj[ident], self.id_to_refcount[ident]
- util.debug('disposing of obj with id %r', ident)
+ del self.id_to_refcount[ident]
+
+ if ident not in self.id_to_refcount:
+ # Two-step process in case the object turns out to contain other
+ # proxy objects (e.g. a managed list of managed lists).
+ # Otherwise, deleting self.id_to_obj[ident] would trigger the
+ # deleting of the stored value (another managed object) which would
+ # in turn attempt to acquire the mutex that is already held here.
+ self.id_to_obj[ident] = (None, (), None) # thread-safe
+ util.debug('disposing of obj with id %r', ident)
+ with self.mutex:
+ del self.id_to_obj[ident]
+
#
# Class to represent state of a manager
@@ -658,7 +693,7 @@ class BaseProxy(object):
_mutex = util.ForkAwareThreadLock()
def __init__(self, token, serializer, manager=None,
- authkey=None, exposed=None, incref=True):
+ authkey=None, exposed=None, incref=True, manager_owned=False):
with BaseProxy._mutex:
tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None:
@@ -680,6 +715,12 @@ class BaseProxy(object):
self._serializer = serializer
self._Client = listener_client[serializer][1]
+ # Should be set to True only when a proxy object is being created
+ # on the manager server; primary use case: nested proxy objects.
+ # RebuildProxy detects when a proxy is being created on the manager
+ # and sets this value appropriately.
+ self._owned_by_manager = manager_owned
+
if authkey is not None:
self._authkey = process.AuthenticationString(authkey)
elif self._manager is not None:
@@ -738,6 +779,10 @@ class BaseProxy(object):
return self._callmethod('#GETVALUE')
def _incref(self):
+ if self._owned_by_manager:
+ util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
+ return
+
conn = self._Client(self._token.address, authkey=self._authkey)
dispatch(conn, None, 'incref', (self._id,))
util.debug('INCREF %r', self._token.id)
@@ -822,19 +867,19 @@ class BaseProxy(object):
def RebuildProxy(func, token, serializer, kwds):
'''
Function used for unpickling proxy objects.
-
- If possible the shared object is returned, or otherwise a proxy for it.
'''
server = getattr(process.current_process(), '_manager_server', None)
-
if server and server.address == token.address:
- return server.id_to_obj[token.id][0]
- else:
- incref = (
- kwds.pop('incref', True) and
- not getattr(process.current_process(), '_inheriting', False)
- )
- return func(token, serializer, incref=incref, **kwds)
+ util.debug('Rebuild a proxy owned by manager, token=%r', token)
+ kwds['manager_owned'] = True
+ if token.id not in server.id_to_local_proxy_obj:
+ server.id_to_local_proxy_obj[token.id] = \
+ server.id_to_obj[token.id]
+ incref = (
+ kwds.pop('incref', True) and
+ not getattr(process.current_process(), '_inheriting', False)
+ )
+ return func(token, serializer, incref=incref, **kwds)
#
# Functions to create proxies and proxy types
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index cfd801e..d88cd07 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -1628,13 +1628,33 @@ class _TestContainers(BaseTestCase):
d = [a, b]
e = self.list(d)
self.assertEqual(
- e[:],
+ [element[:] for element in e],
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
)
f = self.list([a])
a.append('hello')
- self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
+ self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
+
+ def test_list_proxy_in_list(self):
+ a = self.list([self.list(range(3)) for _i in range(3)])
+ self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
+
+ a[0][-1] = 55
+ self.assertEqual(a[0][:], [0, 1, 55])
+ for i in range(1, 3):
+ self.assertEqual(a[i][:], [0, 1, 2])
+
+ self.assertEqual(a[1].pop(), 2)
+ self.assertEqual(len(a[1]), 2)
+ for i in range(0, 3, 2):
+ self.assertEqual(len(a[i]), 3)
+
+ del a
+
+ b = self.list()
+ b.append(b)
+ del b
def test_dict(self):
d = self.dict()
@@ -1646,6 +1666,52 @@ class _TestContainers(BaseTestCase):
self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
+ def test_dict_proxy_nested(self):
+ pets = self.dict(ferrets=2, hamsters=4)
+ supplies = self.dict(water=10, feed=3)
+ d = self.dict(pets=pets, supplies=supplies)
+
+ self.assertEqual(supplies['water'], 10)
+ self.assertEqual(d['supplies']['water'], 10)
+
+ d['supplies']['blankets'] = 5
+ self.assertEqual(supplies['blankets'], 5)
+ self.assertEqual(d['supplies']['blankets'], 5)
+
+ d['supplies']['water'] = 7
+ self.assertEqual(supplies['water'], 7)
+ self.assertEqual(d['supplies']['water'], 7)
+
+ del pets
+ del supplies
+ self.assertEqual(d['pets']['ferrets'], 2)
+ d['supplies']['blankets'] = 11
+ self.assertEqual(d['supplies']['blankets'], 11)
+
+ pets = d['pets']
+ supplies = d['supplies']
+ supplies['water'] = 7
+ self.assertEqual(supplies['water'], 7)
+ self.assertEqual(d['supplies']['water'], 7)
+
+ d.clear()
+ self.assertEqual(len(d), 0)
+ self.assertEqual(supplies['water'], 7)
+ self.assertEqual(pets['hamsters'], 4)
+
+ l = self.list([pets, supplies])
+ l[0]['marmots'] = 1
+ self.assertEqual(pets['marmots'], 1)
+ self.assertEqual(l[0]['marmots'], 1)
+
+ del pets
+ del supplies
+ self.assertEqual(l[0]['marmots'], 1)
+
+ outer = self.list([[88, 99], l])
+ self.assertIsInstance(outer[0], list) # Not a ListProxy
+ self.assertEqual(outer[-1][-1]['feed'], 3)
+
def test_namespace(self):
n = self.Namespace()
n.name = 'Bob'