summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/queues.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r--Lib/multiprocessing/queues.py20
1 files changed, 12 insertions, 8 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 51d9912..c4f9cda 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -39,12 +39,12 @@ import os
import threading
import collections
import time
-import atexit
import weakref
+import errno
from queue import Empty, Full
import _multiprocessing
-from multiprocessing import Pipe
+from multiprocessing.connection import Pipe, SentinelReady
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning
@@ -67,6 +67,8 @@ class Queue(object):
else:
self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize)
+ # For use by concurrent.futures
+ self._ignore_epipe = False
self._after_fork()
@@ -75,11 +77,11 @@ class Queue(object):
def __getstate__(self):
assert_spawning(self)
- return (self._maxsize, self._reader, self._writer,
+ return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state):
- (self._maxsize, self._reader, self._writer,
+ (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork()
@@ -182,7 +184,7 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send,
- self._wlock, self._writer.close),
+ self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread'
)
self._thread.daemon = True
@@ -233,7 +235,7 @@ class Queue(object):
notempty.release()
@staticmethod
- def _feed(buffer, notempty, send, writelock, close):
+ def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
from .util import is_exiting
@@ -275,6 +277,8 @@ class Queue(object):
except IndexError:
pass
except Exception as e:
+ if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
+ return
# Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has
@@ -376,10 +380,10 @@ class SimpleQueue(object):
def _make_methods(self):
recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get():
+ def get(*, sentinels=None):
racquire()
try:
- return recv()
+ return recv(sentinels)
finally:
rrelease()
self.get = get