diff options
author | Petr Viktorin <encukou@gmail.com> | 2024-11-13 09:25:10 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-13 09:25:10 (GMT) |
commit | ba088c8f9cf7163b0f28c507cb1343befe21997e (patch) | |
tree | d22735706db59196f6cbf45d1566815279dc2801 /Lib/multiprocessing/managers.py | |
parent | 1e40c5ba47780ddd91868abb3aa064f5ba3015e4 (diff) | |
download | cpython-ba088c8f9cf7163b0f28c507cb1343befe21997e.zip cpython-ba088c8f9cf7163b0f28c507cb1343befe21997e.tar.gz cpython-ba088c8f9cf7163b0f28c507cb1343befe21997e.tar.bz2 |
gh-71936: Fix race condition in multiprocessing.Pool (GH-124973)
* gh-71936: Fix race condition in multiprocessing.Pool
Proxes of shared objects register a Finalizer in BaseProxy._incref(), and it
will call BaseProxy._decref() when it is GCed. This may cause a race condition
with Pool(maxtasksperchild=None) on Windows.
A connection would be closed and raised TypeError when a GC occurs between
_ConnectionBase._check_writable() and _ConnectionBase._send_bytes() in
_ConnectionBase.send() in the second or later task, and a new object
is allocated that shares the id() of a previously deleted one.
Instead of using the id() of the token (or the proxy), use a unique,
non-reusable number.
Co-Authored-By: Akinori Hattori <hattya@gmail.com>
Diffstat (limited to 'Lib/multiprocessing/managers.py')
-rw-r--r-- | Lib/multiprocessing/managers.py | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index a5d2f53..040f467 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -759,22 +759,29 @@ class BaseProxy(object): _address_to_local = {} _mutex = util.ForkAwareThreadLock() + # Each instance gets a `_serial` number. Unlike `id(...)`, this number + # is never reused. + _next_serial = 1 + def __init__(self, token, serializer, manager=None, authkey=None, exposed=None, incref=True, manager_owned=False): with BaseProxy._mutex: - tls_idset = BaseProxy._address_to_local.get(token.address, None) - if tls_idset is None: - tls_idset = util.ForkAwareLocal(), ProcessLocalSet() - BaseProxy._address_to_local[token.address] = tls_idset + tls_serials = BaseProxy._address_to_local.get(token.address, None) + if tls_serials is None: + tls_serials = util.ForkAwareLocal(), ProcessLocalSet() + BaseProxy._address_to_local[token.address] = tls_serials + + self._serial = BaseProxy._next_serial + BaseProxy._next_serial += 1 # self._tls is used to record the connection used by this # thread to communicate with the manager at token.address - self._tls = tls_idset[0] + self._tls = tls_serials[0] - # self._idset is used to record the identities of all shared - # objects for which the current process owns references and + # self._all_serials is a set used to record the identities of all + # shared objects for which the current process owns references and # which are in the manager at token.address - self._idset = tls_idset[1] + self._all_serials = tls_serials[1] self._token = token self._id = self._token.id @@ -857,20 +864,20 @@ class BaseProxy(object): dispatch(conn, None, 'incref', (self._id,)) util.debug('INCREF %r', self._token.id) - self._idset.add(self._id) + self._all_serials.add(self._serial) state = self._manager and self._manager._state self._close = util.Finalize( self, BaseProxy._decref, - args=(self._token, self._authkey, state, - self._tls, self._idset, self._Client), + args=(self._token, self._serial, self._authkey, state, + self._tls, self._all_serials, self._Client), exitpriority=10 ) @staticmethod - def _decref(token, authkey, state, tls, idset, _Client): - idset.discard(token.id) + def _decref(token, serial, authkey, state, tls, idset, _Client): + idset.discard(serial) # check whether manager is still alive if state is None or state.value == State.STARTED: |