summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/queues.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r--Lib/multiprocessing/queues.py34
1 files changed, 17 insertions, 17 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 78cb362..ea89090 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -41,9 +41,9 @@ class Queue(object):
else:
self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize)
-
+
self._after_fork()
-
+
if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)
@@ -51,12 +51,12 @@ class Queue(object):
assert_spawning(self)
return (self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
-
+
def __setstate__(self, state):
(self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork()
-
+
def _after_fork(self):
debug('Queue._after_fork()')
self._notempty = threading.Condition(threading.Lock())
@@ -69,7 +69,7 @@ class Queue(object):
self._send = self._writer.send
self._recv = self._reader.recv
self._poll = self._reader.poll
-
+
def put(self, obj, block=True, timeout=None):
assert not self._closed
if not self._sem.acquire(block, timeout):
@@ -93,7 +93,7 @@ class Queue(object):
return res
finally:
self._rlock.release()
-
+
else:
if block:
deadline = time.time() + timeout
@@ -135,7 +135,7 @@ class Queue(object):
assert self._closed
if self._jointhread:
self._jointhread()
-
+
def cancel_join_thread(self):
debug('Queue.cancel_join_thread()')
self._joincancelled = True
@@ -146,7 +146,7 @@ class Queue(object):
def _start_thread(self):
debug('Queue._start_thread()')
-
+
# Start thread which transfers data from buffer to pipe
self._buffer.clear()
self._thread = threading.Thread(
@@ -174,14 +174,14 @@ class Queue(object):
[weakref.ref(self._thread)],
exitpriority=-5
)
-
+
# Send sentinel to the thread queue object when garbage collected
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
)
-
+
@staticmethod
def _finalize_join(twr):
debug('joining queue thread')
@@ -191,7 +191,7 @@ class Queue(object):
debug('... queue thread joined')
else:
debug('... queue thread already dead')
-
+
@staticmethod
def _finalize_close(buffer, notempty):
debug('telling queue thread to quit')
@@ -206,7 +206,7 @@ class Queue(object):
def _feed(buffer, notempty, send, writelock, close):
debug('starting thread to feed data to pipe')
from .util import is_exiting
-
+
nacquire = notempty.acquire
nrelease = notempty.release
nwait = notempty.wait
@@ -217,7 +217,7 @@ class Queue(object):
wrelease = writelock.release
else:
wacquire = None
-
+
try:
while 1:
nacquire()
@@ -257,7 +257,7 @@ class Queue(object):
traceback.print_exc()
except Exception:
pass
-
+
_sentinel = object()
#
@@ -274,7 +274,7 @@ class JoinableQueue(Queue):
Queue.__init__(self, maxsize)
self._unfinished_tasks = Semaphore(0)
self._cond = Condition()
-
+
def __getstate__(self):
return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
@@ -285,7 +285,7 @@ class JoinableQueue(Queue):
def put(self, item, block=True, timeout=None):
Queue.put(self, item, block, timeout)
self._unfinished_tasks.release()
-
+
def task_done(self):
self._cond.acquire()
try:
@@ -295,7 +295,7 @@ class JoinableQueue(Queue):
self._cond.notify_all()
finally:
self._cond.release()
-
+
def join(self):
self._cond.acquire()
try: