summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDong-hee Na <donghee.na92@gmail.com>2020-04-14 16:35:36 (GMT)
committerGitHub <noreply@github.com>2020-04-14 16:35:36 (GMT)
commita5900ecf9f22e65bef489633692e9ea6941379c5 (patch)
treea4d5df2bbb90bbe501ff2d51ec7afca1392e729d
parente560f90602870601945ea7a4f7770827608817d2 (diff)
downloadcpython-a5900ecf9f22e65bef489633692e9ea6941379c5.zip
cpython-a5900ecf9f22e65bef489633692e9ea6941379c5.tar.gz
cpython-a5900ecf9f22e65bef489633692e9ea6941379c5.tar.bz2
bpo-40221: Update multiprocessing to use _at_fork_reinit (GH-19511)
-rw-r--r--Lib/multiprocessing/queues.py13
-rw-r--r--Lib/multiprocessing/resource_sharer.py6
2 files changed, 10 insertions, 9 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 8350701..c0a284d 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -49,8 +49,7 @@ class Queue(object):
self._sem = ctx.BoundedSemaphore(maxsize)
# For use by concurrent.futures
self._ignore_epipe = False
-
- self._after_fork()
+ self._reset()
if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)
@@ -63,11 +62,17 @@ class Queue(object):
def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
- self._after_fork()
+ self._reset()
def _after_fork(self):
debug('Queue._after_fork()')
- self._notempty = threading.Condition(threading.Lock())
+ self._reset(after_fork=True)
+
+ def _reset(self, after_fork=False):
+ if after_fork:
+ self._notempty._at_fork_reinit()
+ else:
+ self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py
index 8d5c990..6607650 100644
--- a/Lib/multiprocessing/resource_sharer.py
+++ b/Lib/multiprocessing/resource_sharer.py
@@ -63,7 +63,6 @@ class _ResourceSharer(object):
def __init__(self):
self._key = 0
self._cache = {}
- self._old_locks = []
self._lock = threading.Lock()
self._listener = None
self._address = None
@@ -113,10 +112,7 @@ class _ResourceSharer(object):
for key, (send, close) in self._cache.items():
close()
self._cache.clear()
- # If self._lock was locked at the time of the fork, it may be broken
- # -- see issue 6721. Replace it without letting it be gc'ed.
- self._old_locks.append(self._lock)
- self._lock = threading.Lock()
+ self._lock._at_fork_reinit()
if self._listener is not None:
self._listener.close()
self._listener = None