summaryrefslogtreecommitdiffstats
path: root/Doc/library/asyncio-queue.rst
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2018-09-11 16:54:40 (GMT)
committerGitHub <noreply@github.com>2018-09-11 16:54:40 (GMT)
commit7c7605ff1133cf757cac428c483827f666c7c827 (patch)
treef2ec281f9302eb4b493c34624577224c38c83949 /Doc/library/asyncio-queue.rst
parent735171e33486131d93865cf851c0c3d63fffd364 (diff)
downloadcpython-7c7605ff1133cf757cac428c483827f666c7c827.zip
cpython-7c7605ff1133cf757cac428c483827f666c7c827.tar.gz
cpython-7c7605ff1133cf757cac428c483827f666c7c827.tar.bz2
bpo-33649: First asyncio docs improvement pass (GH-9142)
Rewritten/updated sections: * Event Loop APIs * Transports & Protocols * Streams * Exceptions * Policies * Queues * Subprocesses * Platforms
Diffstat (limited to 'Doc/library/asyncio-queue.rst')
-rw-r--r--Doc/library/asyncio-queue.rst197
1 files changed, 117 insertions, 80 deletions
diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst
index 65497f2..1e4470a 100644
--- a/Doc/library/asyncio-queue.rst
+++ b/Doc/library/asyncio-queue.rst
@@ -1,41 +1,41 @@
.. currentmodule:: asyncio
+
+======
Queues
======
-**Source code:** :source:`Lib/asyncio/queues.py`
-
-Queues:
+asyncio queues are designed to be similar to classes of the
+:mod:`queue` module. Although asyncio queues are not thread-safe,
+they are designed to be used specifically in async/await code.
-* :class:`Queue`
-* :class:`PriorityQueue`
-* :class:`LifoQueue`
+Note that methods on asyncio queues don't have a *timeout* parameter;
+use :func:`asyncio.wait_for` function to do queue operations with a
+timeout.
-asyncio queue API was designed to be close to classes of the :mod:`queue`
-module (:class:`~queue.Queue`, :class:`~queue.PriorityQueue`,
-:class:`~queue.LifoQueue`), but it has no *timeout* parameter. The
-:func:`asyncio.wait_for` function can be used to cancel a task after a timeout.
+See also the `Examples`_ section below.
Queue
------
+=====
.. class:: Queue(maxsize=0, \*, loop=None)
- A queue, useful for coordinating producer and consumer coroutines.
+ A first in, first out (FIFO) queue.
- If *maxsize* is less than or equal to zero, the queue size is infinite. If
- it is an integer greater than ``0``, then ``await put()`` will block
- when the queue reaches *maxsize*, until an item is removed by :meth:`get`.
+ If *maxsize* is less than or equal to zero, the queue size is
+ infinite. If it is an integer greater than ``0``, then
+ ``await put()`` blocks when the queue reaches *maxsize*
+ until an item is removed by :meth:`get`.
- Unlike the standard library :mod:`queue`, you can reliably know this Queue's
- size with :meth:`qsize`, since your single-threaded asyncio application won't
- be interrupted between calling :meth:`qsize` and doing an operation on the
- Queue.
+ Unlike the standard library threading :mod:`queue`, the size of
+ the queue is always known and can be returned by calling the
+ :meth:`qsize` method.
This class is :ref:`not thread safe <asyncio-multithreading>`.
- .. versionchanged:: 3.4.4
- New :meth:`join` and :meth:`task_done` methods.
+ .. attribute:: maxsize
+
+ Number of items allowed in the queue.
.. method:: empty()
@@ -45,26 +45,16 @@ Queue
Return ``True`` if there are :attr:`maxsize` items in the queue.
- .. note::
-
- If the Queue was initialized with ``maxsize=0`` (the default), then
- :meth:`full()` is never ``True``.
+ If the queue was initialized with ``maxsize=0`` (the default),
+ then :meth:`full()` never returns ``True``.
.. coroutinemethod:: get()
- Remove and return an item from the queue. If queue is empty, wait until
- an item is available.
-
- This method is a :ref:`coroutine <coroutine>`.
-
- .. seealso::
-
- The :meth:`empty` method.
+ Remove and return an item from the queue. If queue is empty,
+ wait until an item is available.
.. method:: get_nowait()
- Remove and return an item from the queue.
-
Return an item if one is immediately available, else raise
:exc:`QueueEmpty`.
@@ -72,26 +62,16 @@ Queue
Block until all items in the queue have been gotten and processed.
- The count of unfinished tasks goes up whenever an item is added to the
- queue. The count goes down whenever a consumer thread calls
- :meth:`task_done` to indicate that the item was retrieved and all work on
- it is complete. When the count of unfinished tasks drops to zero,
- :meth:`join` unblocks.
-
- This method is a :ref:`coroutine <coroutine>`.
-
- .. versionadded:: 3.4.4
+ The count of unfinished tasks goes up whenever an item is added
+ to the queue. The count goes down whenever a consumer thread calls
+ :meth:`task_done` to indicate that the item was retrieved and all
+ work on it is complete. When the count of unfinished tasks drops
+ to zero, :meth:`join` unblocks.
.. coroutinemethod:: put(item)
- Put an item into the queue. If the queue is full, wait until a free slot
- is available before adding item.
-
- This method is a :ref:`coroutine <coroutine>`.
-
- .. seealso::
-
- The :meth:`full` method.
+ Put an item into the queue. If the queue is full, wait until a
+ free slot is available before adding item.
.. method:: put_nowait(item)
@@ -107,54 +87,111 @@ Queue
Indicate that a formerly enqueued task is complete.
- Used by queue consumers. For each :meth:`~Queue.get` used to fetch a task, a
- subsequent call to :meth:`task_done` tells the queue that the processing
- on the task is complete.
-
- If a :meth:`join` is currently blocking, it will resume when all items
- have been processed (meaning that a :meth:`task_done` call was received
- for every item that had been :meth:`~Queue.put` into the queue).
+ Used by queue consumers. For each :meth:`~Queue.get` used to
+ fetch a task, a subsequent call to :meth:`task_done` tells the
+ queue that the processing on the task is complete.
- Raises :exc:`ValueError` if called more times than there were items
- placed in the queue.
+ If a :meth:`join` is currently blocking, it will resume when all
+ items have been processed (meaning that a :meth:`task_done`
+ call was received for every item that had been :meth:`~Queue.put`
+ into the queue).
- .. versionadded:: 3.4.4
+ Raises :exc:`ValueError` if called more times than there were
+ items placed in the queue.
- .. attribute:: maxsize
-
- Number of items allowed in the queue.
-
-PriorityQueue
--------------
+Priority Queue
+==============
.. class:: PriorityQueue
- A subclass of :class:`Queue`; retrieves entries in priority order (lowest
- first).
+ A variant of :class:`Queue`; retrieves entries in priority order
+ (lowest first).
- Entries are typically tuples of the form: (priority number, data).
+ Entries are typically tuples of the form
+ ``(priority_number, data)``.
-LifoQueue
----------
+LIFO Queue
+==========
.. class:: LifoQueue
- A subclass of :class:`Queue` that retrieves most recently added entries
- first.
+ A variant of :class:`Queue` that retrieves most recently added
+ entries first (last in, first out).
Exceptions
-^^^^^^^^^^
+==========
.. exception:: QueueEmpty
- Exception raised when the :meth:`~Queue.get_nowait` method is called on a
- :class:`Queue` object which is empty.
+ This exception is raised when the :meth:`~Queue.get_nowait` method
+ is called on an empty queue.
.. exception:: QueueFull
- Exception raised when the :meth:`~Queue.put_nowait` method is called on a
- :class:`Queue` object which is full.
+ Exception raised when the :meth:`~Queue.put_nowait` method is called
+ on a queue that has reached its *maxsize*.
+
+
+Examples
+========
+
+Queues can be used to distribute workload between several
+concurrent tasks::
+
+ import asyncio
+ import random
+ import time
+
+
+ async def worker(name, queue):
+ while True:
+ # Get a "work item" out of the queue.
+ sleep_for = await queue.get()
+
+ # Sleep for the "sleep_for" seconds.
+ await asyncio.sleep(sleep_for)
+
+ # Notify the queue that the "work item" has been processed.
+ queue.task_done()
+
+ print(f'{name} has slept for {sleep_for:.2f} seconds')
+
+
+ async def main():
+ # Create a queue that we will use to store our "workload".
+ queue = asyncio.Queue()
+
+ # Generate random timings and put them into the queue.
+ total_sleep_time = 0
+ for _ in range(20):
+ sleep_for = random.uniform(0.05, 1.0)
+ total_sleep_time += sleep_for
+ queue.put_nowait(sleep_for)
+
+ # Create three worker tasks to process the queue concurrently.
+ tasks = []
+ for i in range(3):
+ task = asyncio.create_task(worker(f'worker-{i}', queue))
+ tasks.append(task)
+
+ # Wait until the queue is fully processed.
+ started_at = time.monotonic()
+ await queue.join()
+ total_slept_for = time.monotonic() - started_at
+
+ # Cancel our worker tasks.
+ for task in tasks:
+ task.cancel()
+ # Wait until all worker tasks are cancelled.
+ await asyncio.gather(*tasks, return_exceptions=True)
+
+ print('====')
+ print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
+ print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
+
+
+ asyncio.run(main())