summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorGéry Ogam <gery.ogam@gmail.com>2022-05-03 23:49:57 (GMT)
committerGitHub <noreply@github.com>2022-05-03 23:49:57 (GMT)
commitdfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4 (patch)
tree230a0ede00885202ef2a2a2efd711cb35fecab56 /Lib/multiprocessing
parentf629dcfe835e349433e4c5099381d668e8fe69c8 (diff)
downloadcpython-dfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4.zip
cpython-dfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4.tar.gz
cpython-dfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4.tar.bz2
bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage collection and explicit close (#31913)
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/queues.py23
1 files changed, 11 insertions, 12 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index a290181..f37f114 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -139,13 +139,10 @@ class Queue(object):
def close(self):
self._closed = True
- try:
- self._reader.close()
- finally:
- close = self._close
- if close:
- self._close = None
- close()
+ close = self._close
+ if close:
+ self._close = None
+ close()
def join_thread(self):
debug('Queue.join_thread()')
@@ -169,8 +166,9 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
- self._wlock, self._writer.close, self._ignore_epipe,
- self._on_queue_feeder_error, self._sem),
+ self._wlock, self._reader.close, self._writer.close,
+ self._ignore_epipe, self._on_queue_feeder_error,
+ self._sem),
name='QueueFeederThread'
)
self._thread.daemon = True
@@ -211,8 +209,8 @@ class Queue(object):
notempty.notify()
@staticmethod
- def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
- onerror, queue_sem):
+ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
+ writer_close, ignore_epipe, onerror, queue_sem):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
@@ -238,7 +236,8 @@ class Queue(object):
obj = bpopleft()
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
- close()
+ reader_close()
+ writer_close()
return
# serialize the data before acquiring the lock