diff options
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r-- | Lib/multiprocessing/queues.py | 356 |
1 files changed, 356 insertions, 0 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py new file mode 100644 index 0000000..fb6cb6d --- /dev/null +++ b/Lib/multiprocessing/queues.py @@ -0,0 +1,356 @@ +# +# Module implementing queues +# +# multiprocessing/queues.py +# +# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# + +__all__ = ['Queue', 'SimpleQueue'] + +import sys +import os +import threading +import collections +import time +import atexit +import weakref + +from queue import Empty, Full +import _multiprocessing +from multiprocessing import Pipe +from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition +from multiprocessing.util import debug, info, Finalize, register_after_fork +from multiprocessing.forking import assert_spawning + +# +# Queue type using a pipe, buffer and thread +# + +class Queue(object): + + def __init__(self, maxsize=0): + if maxsize <= 0: + maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX + self._maxsize = maxsize + self._reader, self._writer = Pipe(duplex=False) + self._rlock = Lock() + self._opid = os.getpid() + if sys.platform == 'win32': + self._wlock = None + else: + self._wlock = Lock() + self._sem = BoundedSemaphore(maxsize) + + self._after_fork() + + if sys.platform != 'win32': + register_after_fork(self, Queue._after_fork) + + def __getstate__(self): + assert_spawning(self) + return (self._maxsize, self._reader, self._writer, + self._rlock, self._wlock, self._sem, self._opid) + + def __setstate__(self, state): + (self._maxsize, self._reader, self._writer, + self._rlock, self._wlock, self._sem, self._opid) = state + self._after_fork() + + def _after_fork(self): + debug('Queue._after_fork()') + self._notempty = threading.Condition(threading.Lock()) + self._buffer = collections.deque() + self._thread = None + self._jointhread = None + self._joincancelled = False + self._closed = False + self._close = None + self._send = self._writer.send + self._recv = self._reader.recv + self._poll = self._reader.poll + + def put(self, obj, block=True, timeout=None): + assert not self._closed + if not self._sem.acquire(block, timeout): + raise Full + + self._notempty.acquire() + try: + if self._thread is None: + self._start_thread() + self._buffer.append(obj) + self._notempty.notify() + finally: + self._notempty.release() + + def get(self, block=True, timeout=None): + if block and timeout is None: + self._rlock.acquire() + try: + res = self._recv() + self._sem.release() + return res + finally: + self._rlock.release() + + else: + if block: + deadline = time.time() + timeout + if not self._rlock.acquire(block, timeout): + raise Empty + try: + if not self._poll(block and (deadline-time.time()) or 0.0): + raise Empty + res = self._recv() + self._sem.release() + return res + finally: + self._rlock.release() + + def qsize(self): + # Raises NotImplementError on Mac OSX because of broken sem_getvalue() + return self._maxsize - self._sem._semlock._get_value() + + def empty(self): + return not self._poll() + + def full(self): + return self._sem._semlock._is_zero() + + def get_nowait(self): + return self.get(False) + + def put_nowait(self, obj): + return self.put(obj, False) + + def close(self): + self._closed = True + self._reader.close() + if self._close: + self._close() + + def join_thread(self): + debug('Queue.join_thread()') + assert self._closed + if self._jointhread: + self._jointhread() + + def cancel_join_thread(self): + debug('Queue.cancel_join_thread()') + self._joincancelled = True + try: + self._jointhread.cancel() + except AttributeError: + pass + + def _start_thread(self): + debug('Queue._start_thread()') + + # Start thread which transfers data from buffer to pipe + self._buffer.clear() + self._thread = threading.Thread( + target=Queue._feed, + args=(self._buffer, self._notempty, self._send, + self._wlock, self._writer.close), + name='QueueFeederThread' + ) + self._thread.setDaemon(True) + + debug('doing self._thread.start()') + self._thread.start() + debug('... done self._thread.start()') + + # On process exit we will wait for data to be flushed to pipe. + # + # However, if this process created the queue then all + # processes which use the queue will be descendants of this + # process. Therefore waiting for the queue to be flushed + # is pointless once all the child processes have been joined. + created_by_this_process = (self._opid == os.getpid()) + if not self._joincancelled and not created_by_this_process: + self._jointhread = Finalize( + self._thread, Queue._finalize_join, + [weakref.ref(self._thread)], + exitpriority=-5 + ) + + # Send sentinel to the thread queue object when garbage collected + self._close = Finalize( + self, Queue._finalize_close, + [self._buffer, self._notempty], + exitpriority=10 + ) + + @staticmethod + def _finalize_join(twr): + debug('joining queue thread') + thread = twr() + if thread is not None: + thread.join() + debug('... queue thread joined') + else: + debug('... queue thread already dead') + + @staticmethod + def _finalize_close(buffer, notempty): + debug('telling queue thread to quit') + notempty.acquire() + try: + buffer.append(_sentinel) + notempty.notify() + finally: + notempty.release() + + @staticmethod + def _feed(buffer, notempty, send, writelock, close): + debug('starting thread to feed data to pipe') + from .util import is_exiting + + nacquire = notempty.acquire + nrelease = notempty.release + nwait = notempty.wait + bpopleft = buffer.popleft + sentinel = _sentinel + if sys.platform != 'win32': + wacquire = writelock.acquire + wrelease = writelock.release + else: + wacquire = None + + try: + while 1: + nacquire() + try: + if not buffer: + nwait() + finally: + nrelease() + try: + while 1: + obj = bpopleft() + if obj is sentinel: + debug('feeder thread got sentinel -- exiting') + close() + return + + if wacquire is None: + send(obj) + else: + wacquire() + try: + send(obj) + finally: + wrelease() + except IndexError: + pass + except Exception as e: + # Since this runs in a daemon thread the resources it uses + # may be become unusable while the process is cleaning up. + # We ignore errors which happen after the process has + # started to cleanup. + try: + if is_exiting(): + info('error in queue thread: %s', e) + else: + import traceback + traceback.print_exc() + except Exception: + pass + +_sentinel = object() + +# +# A queue type which also supports join() and task_done() methods +# +# Note that if you do not call task_done() for each finished task then +# eventually the counter's semaphore may overflow causing Bad Things +# to happen. +# + +class JoinableQueue(Queue): + + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + self._unfinished_tasks = Semaphore(0) + self._cond = Condition() + + def __getstate__(self): + return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) + + def __setstate__(self, state): + Queue.__setstate__(self, state[:-2]) + self._cond, self._unfinished_tasks = state[-2:] + + def put(self, item, block=True, timeout=None): + Queue.put(self, item, block, timeout) + self._unfinished_tasks.release() + + def task_done(self): + self._cond.acquire() + try: + if not self._unfinished_tasks.acquire(False): + raise ValueError('task_done() called too many times') + if self._unfinished_tasks._semlock._is_zero(): + self._cond.notify_all() + finally: + self._cond.release() + + def join(self): + self._cond.acquire() + try: + if not self._unfinished_tasks._semlock._is_zero(): + self._cond.wait() + finally: + self._cond.release() + +# +# Simplified Queue type -- really just a locked pipe +# + +class SimpleQueue(object): + + def __init__(self): + self._reader, self._writer = Pipe(duplex=False) + self._rlock = Lock() + if sys.platform == 'win32': + self._wlock = None + else: + self._wlock = Lock() + self._make_methods() + + def empty(self): + return not self._reader.poll() + + def __getstate__(self): + assert_spawning(self) + return (self._reader, self._writer, self._rlock, self._wlock) + + def __setstate__(self, state): + (self._reader, self._writer, self._rlock, self._wlock) = state + self._make_methods() + + def _make_methods(self): + recv = self._reader.recv + racquire, rrelease = self._rlock.acquire, self._rlock.release + def get(): + racquire() + try: + return recv() + finally: + rrelease() + self.get = get + + if self._wlock is None: + # writes to a message oriented win32 pipe are atomic + self.put = self._writer.send + else: + send = self._writer.send + wacquire, wrelease = self._wlock.acquire, self._wlock.release + def put(obj): + wacquire() + try: + return send(obj) + finally: + wrelease() + self.put = put |