diff options
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r-- | Lib/multiprocessing/queues.py | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 51d9912..262fd85 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -39,12 +39,12 @@ import os import threading import collections import time -import atexit import weakref +import errno from queue import Empty, Full import _multiprocessing -from multiprocessing import Pipe +from multiprocessing.connection import Pipe from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork from multiprocessing.forking import assert_spawning @@ -67,6 +67,8 @@ class Queue(object): else: self._wlock = Lock() self._sem = BoundedSemaphore(maxsize) + # For use by concurrent.futures + self._ignore_epipe = False self._after_fork() @@ -75,11 +77,11 @@ class Queue(object): def __getstate__(self): assert_spawning(self) - return (self._maxsize, self._reader, self._writer, + return (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) def __setstate__(self, state): - (self._maxsize, self._reader, self._writer, + (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state self._after_fork() @@ -182,7 +184,7 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send, - self._wlock, self._writer.close), + self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) self._thread.daemon = True @@ -233,7 +235,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close): + def _feed(buffer, notempty, send, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -275,6 +277,8 @@ class Queue(object): except IndexError: pass except Exception as e: + if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: + return # Since this runs in a daemon thread the resources it uses # may be become unusable while the process is cleaning up. # We ignore errors which happen after the process has @@ -356,6 +360,7 @@ class SimpleQueue(object): def __init__(self): self._reader, self._writer = Pipe(duplex=False) self._rlock = Lock() + self._poll = self._reader.poll if sys.platform == 'win32': self._wlock = None else: @@ -363,7 +368,7 @@ class SimpleQueue(object): self._make_methods() def empty(self): - return not self._reader.poll() + return not self._poll() def __getstate__(self): assert_spawning(self) |