diff options
author | Yury Selivanov <yury@magic.io> | 2018-09-11 16:54:40 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-11 16:54:40 (GMT) |
commit | 7c7605ff1133cf757cac428c483827f666c7c827 (patch) | |
tree | f2ec281f9302eb4b493c34624577224c38c83949 /Doc/library/asyncio-queue.rst | |
parent | 735171e33486131d93865cf851c0c3d63fffd364 (diff) | |
download | cpython-7c7605ff1133cf757cac428c483827f666c7c827.zip cpython-7c7605ff1133cf757cac428c483827f666c7c827.tar.gz cpython-7c7605ff1133cf757cac428c483827f666c7c827.tar.bz2 |
bpo-33649: First asyncio docs improvement pass (GH-9142)
Rewritten/updated sections:
* Event Loop APIs
* Transports & Protocols
* Streams
* Exceptions
* Policies
* Queues
* Subprocesses
* Platforms
Diffstat (limited to 'Doc/library/asyncio-queue.rst')
-rw-r--r-- | Doc/library/asyncio-queue.rst | 197 |
1 files changed, 117 insertions, 80 deletions
diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index 65497f2..1e4470a 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -1,41 +1,41 @@ .. currentmodule:: asyncio + +====== Queues ====== -**Source code:** :source:`Lib/asyncio/queues.py` - -Queues: +asyncio queues are designed to be similar to classes of the +:mod:`queue` module. Although asyncio queues are not thread-safe, +they are designed to be used specifically in async/await code. -* :class:`Queue` -* :class:`PriorityQueue` -* :class:`LifoQueue` +Note that methods on asyncio queues don't have a *timeout* parameter; +use :func:`asyncio.wait_for` function to do queue operations with a +timeout. -asyncio queue API was designed to be close to classes of the :mod:`queue` -module (:class:`~queue.Queue`, :class:`~queue.PriorityQueue`, -:class:`~queue.LifoQueue`), but it has no *timeout* parameter. The -:func:`asyncio.wait_for` function can be used to cancel a task after a timeout. +See also the `Examples`_ section below. Queue ------ +===== .. class:: Queue(maxsize=0, \*, loop=None) - A queue, useful for coordinating producer and consumer coroutines. + A first in, first out (FIFO) queue. - If *maxsize* is less than or equal to zero, the queue size is infinite. If - it is an integer greater than ``0``, then ``await put()`` will block - when the queue reaches *maxsize*, until an item is removed by :meth:`get`. + If *maxsize* is less than or equal to zero, the queue size is + infinite. If it is an integer greater than ``0``, then + ``await put()`` blocks when the queue reaches *maxsize* + until an item is removed by :meth:`get`. - Unlike the standard library :mod:`queue`, you can reliably know this Queue's - size with :meth:`qsize`, since your single-threaded asyncio application won't - be interrupted between calling :meth:`qsize` and doing an operation on the - Queue. + Unlike the standard library threading :mod:`queue`, the size of + the queue is always known and can be returned by calling the + :meth:`qsize` method. This class is :ref:`not thread safe <asyncio-multithreading>`. - .. versionchanged:: 3.4.4 - New :meth:`join` and :meth:`task_done` methods. + .. attribute:: maxsize + + Number of items allowed in the queue. .. method:: empty() @@ -45,26 +45,16 @@ Queue Return ``True`` if there are :attr:`maxsize` items in the queue. - .. note:: - - If the Queue was initialized with ``maxsize=0`` (the default), then - :meth:`full()` is never ``True``. + If the queue was initialized with ``maxsize=0`` (the default), + then :meth:`full()` never returns ``True``. .. coroutinemethod:: get() - Remove and return an item from the queue. If queue is empty, wait until - an item is available. - - This method is a :ref:`coroutine <coroutine>`. - - .. seealso:: - - The :meth:`empty` method. + Remove and return an item from the queue. If queue is empty, + wait until an item is available. .. method:: get_nowait() - Remove and return an item from the queue. - Return an item if one is immediately available, else raise :exc:`QueueEmpty`. @@ -72,26 +62,16 @@ Queue Block until all items in the queue have been gotten and processed. - The count of unfinished tasks goes up whenever an item is added to the - queue. The count goes down whenever a consumer thread calls - :meth:`task_done` to indicate that the item was retrieved and all work on - it is complete. When the count of unfinished tasks drops to zero, - :meth:`join` unblocks. - - This method is a :ref:`coroutine <coroutine>`. - - .. versionadded:: 3.4.4 + The count of unfinished tasks goes up whenever an item is added + to the queue. The count goes down whenever a consumer thread calls + :meth:`task_done` to indicate that the item was retrieved and all + work on it is complete. When the count of unfinished tasks drops + to zero, :meth:`join` unblocks. .. coroutinemethod:: put(item) - Put an item into the queue. If the queue is full, wait until a free slot - is available before adding item. - - This method is a :ref:`coroutine <coroutine>`. - - .. seealso:: - - The :meth:`full` method. + Put an item into the queue. If the queue is full, wait until a + free slot is available before adding item. .. method:: put_nowait(item) @@ -107,54 +87,111 @@ Queue Indicate that a formerly enqueued task is complete. - Used by queue consumers. For each :meth:`~Queue.get` used to fetch a task, a - subsequent call to :meth:`task_done` tells the queue that the processing - on the task is complete. - - If a :meth:`join` is currently blocking, it will resume when all items - have been processed (meaning that a :meth:`task_done` call was received - for every item that had been :meth:`~Queue.put` into the queue). + Used by queue consumers. For each :meth:`~Queue.get` used to + fetch a task, a subsequent call to :meth:`task_done` tells the + queue that the processing on the task is complete. - Raises :exc:`ValueError` if called more times than there were items - placed in the queue. + If a :meth:`join` is currently blocking, it will resume when all + items have been processed (meaning that a :meth:`task_done` + call was received for every item that had been :meth:`~Queue.put` + into the queue). - .. versionadded:: 3.4.4 + Raises :exc:`ValueError` if called more times than there were + items placed in the queue. - .. attribute:: maxsize - - Number of items allowed in the queue. - -PriorityQueue -------------- +Priority Queue +============== .. class:: PriorityQueue - A subclass of :class:`Queue`; retrieves entries in priority order (lowest - first). + A variant of :class:`Queue`; retrieves entries in priority order + (lowest first). - Entries are typically tuples of the form: (priority number, data). + Entries are typically tuples of the form + ``(priority_number, data)``. -LifoQueue ---------- +LIFO Queue +========== .. class:: LifoQueue - A subclass of :class:`Queue` that retrieves most recently added entries - first. + A variant of :class:`Queue` that retrieves most recently added + entries first (last in, first out). Exceptions -^^^^^^^^^^ +========== .. exception:: QueueEmpty - Exception raised when the :meth:`~Queue.get_nowait` method is called on a - :class:`Queue` object which is empty. + This exception is raised when the :meth:`~Queue.get_nowait` method + is called on an empty queue. .. exception:: QueueFull - Exception raised when the :meth:`~Queue.put_nowait` method is called on a - :class:`Queue` object which is full. + Exception raised when the :meth:`~Queue.put_nowait` method is called + on a queue that has reached its *maxsize*. + + +Examples +======== + +Queues can be used to distribute workload between several +concurrent tasks:: + + import asyncio + import random + import time + + + async def worker(name, queue): + while True: + # Get a "work item" out of the queue. + sleep_for = await queue.get() + + # Sleep for the "sleep_for" seconds. + await asyncio.sleep(sleep_for) + + # Notify the queue that the "work item" has been processed. + queue.task_done() + + print(f'{name} has slept for {sleep_for:.2f} seconds') + + + async def main(): + # Create a queue that we will use to store our "workload". + queue = asyncio.Queue() + + # Generate random timings and put them into the queue. + total_sleep_time = 0 + for _ in range(20): + sleep_for = random.uniform(0.05, 1.0) + total_sleep_time += sleep_for + queue.put_nowait(sleep_for) + + # Create three worker tasks to process the queue concurrently. + tasks = [] + for i in range(3): + task = asyncio.create_task(worker(f'worker-{i}', queue)) + tasks.append(task) + + # Wait until the queue is fully processed. + started_at = time.monotonic() + await queue.join() + total_slept_for = time.monotonic() - started_at + + # Cancel our worker tasks. + for task in tasks: + task.cancel() + # Wait until all worker tasks are cancelled. + await asyncio.gather(*tasks, return_exceptions=True) + + print('====') + print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') + print(f'total expected sleep time: {total_sleep_time:.2f} seconds') + + + asyncio.run(main()) |