summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/multiprocessing/queues.py23
-rw-r--r--Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst4
2 files changed, 15 insertions, 12 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index a290181..f37f114 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -139,13 +139,10 @@ class Queue(object):
def close(self):
self._closed = True
- try:
- self._reader.close()
- finally:
- close = self._close
- if close:
- self._close = None
- close()
+ close = self._close
+ if close:
+ self._close = None
+ close()
def join_thread(self):
debug('Queue.join_thread()')
@@ -169,8 +166,9 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
- self._wlock, self._writer.close, self._ignore_epipe,
- self._on_queue_feeder_error, self._sem),
+ self._wlock, self._reader.close, self._writer.close,
+ self._ignore_epipe, self._on_queue_feeder_error,
+ self._sem),
name='QueueFeederThread'
)
self._thread.daemon = True
@@ -211,8 +209,8 @@ class Queue(object):
notempty.notify()
@staticmethod
- def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
- onerror, queue_sem):
+ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
+ writer_close, ignore_epipe, onerror, queue_sem):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
@@ -238,7 +236,8 @@ class Queue(object):
obj = bpopleft()
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
- close()
+ reader_close()
+ writer_close()
return
# serialize the data before acquiring the lock
diff --git a/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst
new file mode 100644
index 0000000..cc05467
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst
@@ -0,0 +1,4 @@
+Always close the read end of the pipe used by :class:`multiprocessing.Queue`
+*after* the last write of buffered data to the write end of the pipe to avoid
+:exc:`BrokenPipeError` at garbage collection and at
+:meth:`multiprocessing.Queue.close` calls. Patch by Géry Ogam.