summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/multiprocessing/managers.py33
-rw-r--r--Misc/ACKS1
-rw-r--r--Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst1
3 files changed, 22 insertions, 13 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index a5d2f53..040f467 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -759,22 +759,29 @@ class BaseProxy(object):
_address_to_local = {}
_mutex = util.ForkAwareThreadLock()
+ # Each instance gets a `_serial` number. Unlike `id(...)`, this number
+ # is never reused.
+ _next_serial = 1
+
def __init__(self, token, serializer, manager=None,
authkey=None, exposed=None, incref=True, manager_owned=False):
with BaseProxy._mutex:
- tls_idset = BaseProxy._address_to_local.get(token.address, None)
- if tls_idset is None:
- tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
- BaseProxy._address_to_local[token.address] = tls_idset
+ tls_serials = BaseProxy._address_to_local.get(token.address, None)
+ if tls_serials is None:
+ tls_serials = util.ForkAwareLocal(), ProcessLocalSet()
+ BaseProxy._address_to_local[token.address] = tls_serials
+
+ self._serial = BaseProxy._next_serial
+ BaseProxy._next_serial += 1
# self._tls is used to record the connection used by this
# thread to communicate with the manager at token.address
- self._tls = tls_idset[0]
+ self._tls = tls_serials[0]
- # self._idset is used to record the identities of all shared
- # objects for which the current process owns references and
+ # self._all_serials is a set used to record the identities of all
+ # shared objects for which the current process owns references and
# which are in the manager at token.address
- self._idset = tls_idset[1]
+ self._all_serials = tls_serials[1]
self._token = token
self._id = self._token.id
@@ -857,20 +864,20 @@ class BaseProxy(object):
dispatch(conn, None, 'incref', (self._id,))
util.debug('INCREF %r', self._token.id)
- self._idset.add(self._id)
+ self._all_serials.add(self._serial)
state = self._manager and self._manager._state
self._close = util.Finalize(
self, BaseProxy._decref,
- args=(self._token, self._authkey, state,
- self._tls, self._idset, self._Client),
+ args=(self._token, self._serial, self._authkey, state,
+ self._tls, self._all_serials, self._Client),
exitpriority=10
)
@staticmethod
- def _decref(token, authkey, state, tls, idset, _Client):
- idset.discard(token.id)
+ def _decref(token, serial, authkey, state, tls, idset, _Client):
+ idset.discard(serial)
# check whether manager is still alive
if state is None or state.value == State.STARTED:
diff --git a/Misc/ACKS b/Misc/ACKS
index dce322f..08cd293 100644
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -733,6 +733,7 @@ Larry Hastings
Tim Hatch
Zac Hatfield-Dodds
Shane Hathaway
+Akinori Hattori
Michael Haubenwallner
Janko Hauser
Flavian Hautbois
diff --git a/Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst b/Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst
new file mode 100644
index 0000000..a0959cc
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst
@@ -0,0 +1 @@
+Fix a race condition in :class:`multiprocessing.pool.Pool`.