diff options
| author | Guido van Rossum <guido@dropbox.com> | 2013-10-17 20:40:50 (GMT) | 
|---|---|---|
| committer | Guido van Rossum <guido@dropbox.com> | 2013-10-17 20:40:50 (GMT) | 
| commit | 27b7c7ebf1039e96cac41b6330cf16b5632d9e49 (patch) | |
| tree | 814505b0f9d02a5cabdec733dcde70250b04ee28 /Lib/asyncio/queues.py | |
| parent | 5b37f97ea5ac9f6b33b0e0269c69539cbb478142 (diff) | |
| download | cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.zip cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.tar.gz cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.tar.bz2  | |
Initial checkin of asyncio package (== Tulip, == PEP 3156).
Diffstat (limited to 'Lib/asyncio/queues.py')
| -rw-r--r-- | Lib/asyncio/queues.py | 284 | 
1 files changed, 284 insertions, 0 deletions
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py new file mode 100644 index 0000000..536de1c --- /dev/null +++ b/Lib/asyncio/queues.py @@ -0,0 +1,284 @@ +"""Queues""" + +__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue', +           'Full', 'Empty'] + +import collections +import heapq +import queue + +from . import events +from . import futures +from . import locks +from .tasks import coroutine + + +# Re-export queue.Full and .Empty exceptions. +Full = queue.Full +Empty = queue.Empty + + +class Queue: +    """A queue, useful for coordinating producer and consumer coroutines. + +    If maxsize is less than or equal to zero, the queue size is infinite. If it +    is an integer greater than 0, then "yield from put()" will block when the +    queue reaches maxsize, until an item is removed by get(). + +    Unlike the standard library Queue, you can reliably know this Queue's size +    with qsize(), since your single-threaded Tulip application won't be +    interrupted between calling qsize() and doing an operation on the Queue. +    """ + +    def __init__(self, maxsize=0, *, loop=None): +        if loop is None: +            self._loop = events.get_event_loop() +        else: +            self._loop = loop +        self._maxsize = maxsize + +        # Futures. +        self._getters = collections.deque() +        # Pairs of (item, Future). +        self._putters = collections.deque() +        self._init(maxsize) + +    def _init(self, maxsize): +        self._queue = collections.deque() + +    def _get(self): +        return self._queue.popleft() + +    def _put(self, item): +        self._queue.append(item) + +    def __repr__(self): +        return '<{} at {:#x} {}>'.format( +            type(self).__name__, id(self), self._format()) + +    def __str__(self): +        return '<{} {}>'.format(type(self).__name__, self._format()) + +    def _format(self): +        result = 'maxsize={!r}'.format(self._maxsize) +        if getattr(self, '_queue', None): +            result += ' _queue={!r}'.format(list(self._queue)) +        if self._getters: +            result += ' _getters[{}]'.format(len(self._getters)) +        if self._putters: +            result += ' _putters[{}]'.format(len(self._putters)) +        return result + +    def _consume_done_getters(self): +        # Delete waiters at the head of the get() queue who've timed out. +        while self._getters and self._getters[0].done(): +            self._getters.popleft() + +    def _consume_done_putters(self): +        # Delete waiters at the head of the put() queue who've timed out. +        while self._putters and self._putters[0][1].done(): +            self._putters.popleft() + +    def qsize(self): +        """Number of items in the queue.""" +        return len(self._queue) + +    @property +    def maxsize(self): +        """Number of items allowed in the queue.""" +        return self._maxsize + +    def empty(self): +        """Return True if the queue is empty, False otherwise.""" +        return not self._queue + +    def full(self): +        """Return True if there are maxsize items in the queue. + +        Note: if the Queue was initialized with maxsize=0 (the default), +        then full() is never True. +        """ +        if self._maxsize <= 0: +            return False +        else: +            return self.qsize() == self._maxsize + +    @coroutine +    def put(self, item): +        """Put an item into the queue. + +        If you yield from put(), wait until a free slot is available +        before adding item. +        """ +        self._consume_done_getters() +        if self._getters: +            assert not self._queue, ( +                'queue non-empty, why are getters waiting?') + +            getter = self._getters.popleft() + +            # Use _put and _get instead of passing item straight to getter, in +            # case a subclass has logic that must run (e.g. JoinableQueue). +            self._put(item) +            getter.set_result(self._get()) + +        elif self._maxsize > 0 and self._maxsize == self.qsize(): +            waiter = futures.Future(loop=self._loop) + +            self._putters.append((item, waiter)) +            yield from waiter + +        else: +            self._put(item) + +    def put_nowait(self, item): +        """Put an item into the queue without blocking. + +        If no free slot is immediately available, raise Full. +        """ +        self._consume_done_getters() +        if self._getters: +            assert not self._queue, ( +                'queue non-empty, why are getters waiting?') + +            getter = self._getters.popleft() + +            # Use _put and _get instead of passing item straight to getter, in +            # case a subclass has logic that must run (e.g. JoinableQueue). +            self._put(item) +            getter.set_result(self._get()) + +        elif self._maxsize > 0 and self._maxsize == self.qsize(): +            raise Full +        else: +            self._put(item) + +    @coroutine +    def get(self): +        """Remove and return an item from the queue. + +        If you yield from get(), wait until a item is available. +        """ +        self._consume_done_putters() +        if self._putters: +            assert self.full(), 'queue not full, why are putters waiting?' +            item, putter = self._putters.popleft() +            self._put(item) + +            # When a getter runs and frees up a slot so this putter can +            # run, we need to defer the put for a tick to ensure that +            # getters and putters alternate perfectly. See +            # ChannelTest.test_wait. +            self._loop.call_soon(putter.set_result, None) + +            return self._get() + +        elif self.qsize(): +            return self._get() +        else: +            waiter = futures.Future(loop=self._loop) + +            self._getters.append(waiter) +            return (yield from waiter) + +    def get_nowait(self): +        """Remove and return an item from the queue. + +        Return an item if one is immediately available, else raise Full. +        """ +        self._consume_done_putters() +        if self._putters: +            assert self.full(), 'queue not full, why are putters waiting?' +            item, putter = self._putters.popleft() +            self._put(item) +            # Wake putter on next tick. +            putter.set_result(None) + +            return self._get() + +        elif self.qsize(): +            return self._get() +        else: +            raise Empty + + +class PriorityQueue(Queue): +    """A subclass of Queue; retrieves entries in priority order (lowest first). + +    Entries are typically tuples of the form: (priority number, data). +    """ + +    def _init(self, maxsize): +        self._queue = [] + +    def _put(self, item, heappush=heapq.heappush): +        heappush(self._queue, item) + +    def _get(self, heappop=heapq.heappop): +        return heappop(self._queue) + + +class LifoQueue(Queue): +    """A subclass of Queue that retrieves most recently added entries first.""" + +    def _init(self, maxsize): +        self._queue = [] + +    def _put(self, item): +        self._queue.append(item) + +    def _get(self): +        return self._queue.pop() + + +class JoinableQueue(Queue): +    """A subclass of Queue with task_done() and join() methods.""" + +    def __init__(self, maxsize=0, *, loop=None): +        super().__init__(maxsize=maxsize, loop=loop) +        self._unfinished_tasks = 0 +        self._finished = locks.Event(loop=self._loop) +        self._finished.set() + +    def _format(self): +        result = Queue._format(self) +        if self._unfinished_tasks: +            result += ' tasks={}'.format(self._unfinished_tasks) +        return result + +    def _put(self, item): +        super()._put(item) +        self._unfinished_tasks += 1 +        self._finished.clear() + +    def task_done(self): +        """Indicate that a formerly enqueued task is complete. + +        Used by queue consumers. For each get() used to fetch a task, +        a subsequent call to task_done() tells the queue that the processing +        on the task is complete. + +        If a join() is currently blocking, it will resume when all items have +        been processed (meaning that a task_done() call was received for every +        item that had been put() into the queue). + +        Raises ValueError if called more times than there were items placed in +        the queue. +        """ +        if self._unfinished_tasks <= 0: +            raise ValueError('task_done() called too many times') +        self._unfinished_tasks -= 1 +        if self._unfinished_tasks == 0: +            self._finished.set() + +    @coroutine +    def join(self): +        """Block until all items in the queue have been gotten and processed. + +        The count of unfinished tasks goes up whenever an item is added to the +        queue. The count goes down whenever a consumer thread calls task_done() +        to indicate that the item was retrieved and all work on it is complete. +        When the count of unfinished tasks drops to zero, join() unblocks. +        """ +        if self._unfinished_tasks > 0: +            yield from self._finished.wait()  | 
