diff options
-rw-r--r-- | Lib/multiprocessing/queues.py | 23 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst | 4 |
2 files changed, 15 insertions, 12 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index a290181..f37f114 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -139,13 +139,10 @@ class Queue(object): def close(self): self._closed = True - try: - self._reader.close() - finally: - close = self._close - if close: - self._close = None - close() + close = self._close + if close: + self._close = None + close() def join_thread(self): debug('Queue.join_thread()') @@ -169,8 +166,9 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, - self._wlock, self._writer.close, self._ignore_epipe, - self._on_queue_feeder_error, self._sem), + self._wlock, self._reader.close, self._writer.close, + self._ignore_epipe, self._on_queue_feeder_error, + self._sem), name='QueueFeederThread' ) self._thread.daemon = True @@ -211,8 +209,8 @@ class Queue(object): notempty.notify() @staticmethod - def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, - onerror, queue_sem): + def _feed(buffer, notempty, send_bytes, writelock, reader_close, + writer_close, ignore_epipe, onerror, queue_sem): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -238,7 +236,8 @@ class Queue(object): obj = bpopleft() if obj is sentinel: debug('feeder thread got sentinel -- exiting') - close() + reader_close() + writer_close() return # serialize the data before acquiring the lock diff --git a/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst new file mode 100644 index 0000000..cc05467 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst @@ -0,0 +1,4 @@ +Always close the read end of the pipe used by :class:`multiprocessing.Queue` +*after* the last write of buffered data to the write end of the pipe to avoid +:exc:`BrokenPipeError` at garbage collection and at +:meth:`multiprocessing.Queue.close` calls. Patch by Géry Ogam. |