diff options
author | Géry Ogam <gery.ogam@gmail.com> | 2022-05-03 23:49:57 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-03 23:49:57 (GMT) |
commit | dfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4 (patch) | |
tree | 230a0ede00885202ef2a2a2efd711cb35fecab56 /Lib/multiprocessing | |
parent | f629dcfe835e349433e4c5099381d668e8fe69c8 (diff) | |
download | cpython-dfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4.zip cpython-dfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4.tar.gz cpython-dfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4.tar.bz2 |
bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage collection and explicit close (#31913)
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/queues.py | 23 |
1 files changed, 11 insertions, 12 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index a290181..f37f114 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -139,13 +139,10 @@ class Queue(object): def close(self): self._closed = True - try: - self._reader.close() - finally: - close = self._close - if close: - self._close = None - close() + close = self._close + if close: + self._close = None + close() def join_thread(self): debug('Queue.join_thread()') @@ -169,8 +166,9 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, - self._wlock, self._writer.close, self._ignore_epipe, - self._on_queue_feeder_error, self._sem), + self._wlock, self._reader.close, self._writer.close, + self._ignore_epipe, self._on_queue_feeder_error, + self._sem), name='QueueFeederThread' ) self._thread.daemon = True @@ -211,8 +209,8 @@ class Queue(object): notempty.notify() @staticmethod - def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, - onerror, queue_sem): + def _feed(buffer, notempty, send_bytes, writelock, reader_close, + writer_close, ignore_epipe, onerror, queue_sem): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -238,7 +236,8 @@ class Queue(object): obj = bpopleft() if obj is sentinel: debug('feeder thread got sentinel -- exiting') - close() + reader_close() + writer_close() return # serialize the data before acquiring the lock |