summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-07-08 10:43:24 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2014-07-08 10:43:24 (GMT)
commit4532c43e16c8d40101759f8ddd0e5631c3626e34 (patch)
treeaa3488fc5d181606adf0f08b0da140a4c8fc18ec
parentde08cb60fdf40c4a684bb4c90b5186f9bf89b6d7 (diff)
parent530ef2f0693d50435a8d62ea84d3fdcbe662d8aa (diff)
downloadcpython-4532c43e16c8d40101759f8ddd0e5631c3626e34.zip
cpython-4532c43e16c8d40101759f8ddd0e5631c3626e34.tar.gz
cpython-4532c43e16c8d40101759f8ddd0e5631c3626e34.tar.bz2
Merge 3.4
asyncio: sync with Tulip - Tulip issue 185: Add a create_task() method to event loops. The create_task() method can be overriden in custom event loop to implement their own task class. For example, greenio and Pulsar projects use their own task class. The create_task() method is now preferred over creating directly task using the Task class. - tests: fix a warning - fix typo in the name of a test function - Update AbstractEventLoop: add new event loop methods; update also the unit test Update asyncio documentation - Document the new create_task() method - "Hide" the Task class: point to the create_task() method for interoperability - Rewrite the documentation of the Task class - Document the "Pending task destroyed" - Update output in debug mode of examples in the dev section - Replace Task() with create_task() in examples
-rw-r--r--Doc/library/asyncio-dev.rst95
-rw-r--r--Doc/library/asyncio-eventloop.rst23
-rw-r--r--Doc/library/asyncio-stream.rst3
-rw-r--r--Doc/library/asyncio-task.rst59
-rw-r--r--Lib/asyncio/base_events.py6
-rw-r--r--Lib/asyncio/events.py9
-rw-r--r--Lib/asyncio/streams.py2
-rw-r--r--Lib/asyncio/tasks.py4
-rw-r--r--Lib/asyncio/test_utils.py2
-rw-r--r--Lib/test/test_asyncio/test_base_events.py24
-rw-r--r--Lib/test/test_asyncio/test_events.py14
-rw-r--r--Lib/test/test_asyncio/test_futures.py4
-rw-r--r--Lib/test/test_asyncio/test_tasks.py3
13 files changed, 194 insertions, 54 deletions
diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst
index 2b3ad94..bf68121 100644
--- a/Doc/library/asyncio-dev.rst
+++ b/Doc/library/asyncio-dev.rst
@@ -103,20 +103,11 @@ the logger ``'asyncio'``.
Detect coroutine objects never scheduled
----------------------------------------
-When a coroutine function is called but not passed to :func:`async` or to the
-:class:`Task` constructor, it is not scheduled and it is probably a bug.
-
-To detect such bug, :ref:`enable the debug mode of asyncio
-<asyncio-debug-mode>`. When the coroutine object is destroyed by the garbage
-collector, a log will be emitted with the traceback where the coroutine
-function was called. See the :ref:`asyncio logger <asyncio-logger>`.
-
-The debug flag changes the behaviour of the :func:`coroutine` decorator. The
-debug flag value is only used when then coroutine function is defined, not when
-it is called. Coroutine functions defined before the debug flag is set to
-``True`` will not be tracked. For example, it is not possible to debug
-coroutines defined in the :mod:`asyncio` module, because the module must be
-imported before the flag value can be changed.
+When a coroutine function is called and its result is not passed to
+:func:`async` or to the :meth:`BaseEventLoop.create_task` method: the execution
+of the coroutine objet will never be scheduled and it is probably a bug.
+:ref:`Enable the debug mode of asyncio <asyncio-debug-mode>` to :ref:`log a
+warning <asyncio-logger>` to detect it.
Example with the bug::
@@ -130,20 +121,27 @@ Example with the bug::
Output in debug mode::
- Coroutine 'test' defined at test.py:4 was never yielded from
+ Coroutine test() at test.py:3 was never yielded from
+ Coroutine object created at (most recent call last):
+ File "test.py", line 7, in <module>
+ test()
-The fix is to call the :func:`async` function or create a :class:`Task` object
-with this coroutine object.
+The fix is to call the :func:`async` function or the
+:meth:`BaseEventLoop.create_task` method with the coroutine object.
+.. seealso::
+
+ :ref:`Pending task destroyed <asyncio-pending-task-destroyed>`.
-Detect exceptions not consumed
-------------------------------
+
+Detect exceptions never consumed
+--------------------------------
Python usually calls :func:`sys.displayhook` on unhandled exceptions. If
-:meth:`Future.set_exception` is called, but the exception is not consumed,
-:func:`sys.displayhook` is not called. Instead, a log is emitted when the
-future is deleted by the garbage collector, with the traceback where the
-exception was raised. See the :ref:`asyncio logger <asyncio-logger>`.
+:meth:`Future.set_exception` is called, but the exception is never consumed,
+:func:`sys.displayhook` is not called. Instead, a :ref:`a log is emitted
+<asyncio-logger>` when the future is deleted by the garbage collector, with the
+traceback where the exception was raised.
Example of unhandled exception::
@@ -159,16 +157,27 @@ Example of unhandled exception::
Output::
- Future/Task exception was never retrieved:
+ Task exception was never retrieved
+ future: <Task finished bug() done at asyncio/coroutines.py:139 exception=Exception('not consumed',)>
+ source_traceback: Object created at (most recent call last):
+ File "test.py", line 10, in <module>
+ asyncio.async(bug())
+ File "asyncio/tasks.py", line 510, in async
+ task = loop.create_task(coro_or_future)
Traceback (most recent call last):
- File "/usr/lib/python3.4/asyncio/tasks.py", line 279, in _step
+ File "asyncio/tasks.py", line 244, in _step
result = next(coro)
- File "/usr/lib/python3.4/asyncio/tasks.py", line 80, in coro
+ File "coroutines.py", line 78, in __next__
+ return next(self.gen)
+ File "asyncio/coroutines.py", line 141, in coro
res = func(*args, **kw)
- File "test.py", line 5, in bug
+ File "test.py", line 7, in bug
raise Exception("not consumed")
Exception: not consumed
+:ref:`Enable the debug mode of asyncio <asyncio-debug-mode>` to get the
+traceback where the task was created.
+
There are different options to fix this issue. The first option is to chain to
coroutine in another coroutine and use classic try/except::
@@ -195,7 +204,7 @@ function::
See also the :meth:`Future.exception` method.
-Chain coroutines correctly
+Chain correctly coroutines
--------------------------
When a coroutine function calls other coroutine functions and tasks, they
@@ -246,7 +255,9 @@ Actual output::
(3) close file
(2) write into file
- Pending tasks at exit: {Task(<create>)<PENDING>}
+ Pending tasks at exit: {<Task pending create() at test.py:7 wait_for=<Future pending cb=[Task._wakeup()]>>}
+ Task was destroyed but it is pending!
+ task: <Task pending create() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>>
The loop stopped before the ``create()`` finished, ``close()`` has been called
before ``write()``, whereas coroutine functions were called in this order:
@@ -272,3 +283,29 @@ Or without ``asyncio.async()``::
yield from asyncio.sleep(2.0)
loop.stop()
+
+.. _asyncio-pending-task-destroyed:
+
+Pending task destroyed
+----------------------
+
+If a pending task is destroyed, the execution of its wrapped :ref:`coroutine
+<coroutine>` did not complete. It is probably a bug and so a warning is logged.
+
+Example of log::
+
+ Task was destroyed but it is pending!
+ source_traceback: Object created at (most recent call last):
+ File "test.py", line 17, in <module>
+ task = asyncio.async(coro, loop=loop)
+ File "asyncio/tasks.py", line 510, in async
+ task = loop.create_task(coro_or_future)
+ task: <Task pending kill_me() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>>
+
+:ref:`Enable the debug mode of asyncio <asyncio-debug-mode>` to get the
+traceback where the task was created.
+
+.. seealso::
+
+ :ref:`Detect coroutine objects never scheduled <asyncio-coroutine-not-scheduled>`.
+
diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst
index 268fa41..1a80921 100644
--- a/Doc/library/asyncio-eventloop.rst
+++ b/Doc/library/asyncio-eventloop.rst
@@ -102,8 +102,8 @@ Run an event loop
Run until the :class:`Future` is done.
- If the argument is a :ref:`coroutine <coroutine>`, it is wrapped
- in a :class:`Task`.
+ If the argument is a :ref:`coroutine object <coroutine>`, it is wrapped by
+ :func:`async`.
Return the Future's result, or raise its exception.
@@ -205,6 +205,25 @@ a different clock than :func:`time.time`.
The :func:`asyncio.sleep` function.
+Coroutines
+----------
+
+.. method:: BaseEventLoop.create_task(coro)
+
+ Schedule the execution of a :ref:`coroutine object <coroutine>`: wrap it in
+ a future. Return a :class:`Task` object.
+
+ Third-party event loops can use their own subclass of :class:`Task` for
+ interoperability. In this case, the result type is a subclass of
+ :class:`Task`.
+
+ .. seealso::
+
+ The :meth:`async` function.
+
+ .. versionadded:: 3.4.2
+
+
Creating connections
--------------------
diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst
index 4543af4..f6b126d 100644
--- a/Doc/library/asyncio-stream.rst
+++ b/Doc/library/asyncio-stream.rst
@@ -41,7 +41,8 @@ Stream functions
:class:`StreamReader` object, while *client_writer* is a
:class:`StreamWriter` object. This parameter can either be a plain callback
function or a :ref:`coroutine function <coroutine>`; if it is a coroutine
- function, it will be automatically converted into a :class:`Task`.
+ function, it will be automatically wrapped in a future using the
+ :meth:`BaseEventLoop.create_task` method.
The rest of the arguments are all the usual arguments to
:meth:`~BaseEventLoop.create_server()` except *protocol_factory*; most
diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst
index 3544657..316a694 100644
--- a/Doc/library/asyncio-task.rst
+++ b/Doc/library/asyncio-task.rst
@@ -51,8 +51,8 @@ generator, and the coroutine object returned by the call is really a
generator object, which doesn't do anything until you iterate over it.
In the case of a coroutine object, there are two basic ways to start
it running: call ``yield from coroutine`` from another coroutine
-(assuming the other coroutine is already running!), or convert it to a
-:class:`Task`.
+(assuming the other coroutine is already running!), or schedule its execution
+using the :meth:`BaseEventLoop.create_task` method.
Coroutines (and tasks) can only run when the event loop is running.
@@ -256,7 +256,7 @@ Example combining a :class:`Future` and a :ref:`coroutine function
loop = asyncio.get_event_loop()
future = asyncio.Future()
- asyncio.Task(slow_operation(future))
+ loop.create_task(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()
@@ -292,7 +292,7 @@ flow::
loop = asyncio.get_event_loop()
future = asyncio.Future()
- asyncio.Task(slow_operation(future))
+ loop.create_task(slow_operation(future))
future.add_done_callback(got_result)
try:
loop.run_forever()
@@ -314,7 +314,33 @@ Task
.. class:: Task(coro, \*, loop=None)
- A coroutine object wrapped in a :class:`Future`. Subclass of :class:`Future`.
+ Schedule the execution of a :ref:`coroutine <coroutine>`: wrap it in a
+ future. A task is a subclass of :class:`Future`.
+
+ A task is responsible to execute a coroutine object in an event loop. If
+ the wrapped coroutine yields from a future, the task suspends the execution
+ of the wrapped coroutine and waits for the completition of the future. When
+ the future is done, the execution of the wrapped coroutine restarts with the
+ result or the exception of the future.
+
+ Event loops use cooperative scheduling: an event loop only runs one task at
+ the same time. Other tasks may run in parallel if other event loops are
+ running in different threads. While a task waits for the completion of a
+ future, the event loop executes a new task.
+
+ The cancellation of a task is different than cancelling a future. Calling
+ :meth:`cancel` will throw a :exc:`~concurrent.futures.CancelledError` to the
+ wrapped coroutine. :meth:`~Future.cancelled` only returns ``True`` if the
+ wrapped coroutine did not catch the
+ :exc:`~concurrent.futures.CancelledError` exception, or raised a
+ :exc:`~concurrent.futures.CancelledError` exception.
+
+ If a pending task is destroyed, the execution of its wrapped :ref:`coroutine
+ <coroutine>` did not complete. It is probably a bug and a warning is
+ logged: see :ref:`Pending task destroyed <asyncio-pending-task-destroyed>`.
+
+ Don't create directly :class:`Task` instances: use the
+ :meth:`BaseEventLoop.create_task` method.
.. classmethod:: all_tasks(loop=None)
@@ -396,12 +422,11 @@ Example executing 3 tasks (A, B, C) in parallel::
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
- tasks = [
- asyncio.Task(factorial("A", 2)),
- asyncio.Task(factorial("B", 3)),
- asyncio.Task(factorial("C", 4))]
-
loop = asyncio.get_event_loop()
+ tasks = [
+ loop.create_task(factorial("A", 2)),
+ loop.create_task(factorial("B", 3)),
+ loop.create_task(factorial("C", 4))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
@@ -450,7 +475,8 @@ Task functions
.. function:: async(coro_or_future, \*, loop=None)
- Wrap a :ref:`coroutine object <coroutine>` in a future.
+ Wrap a :ref:`coroutine object <coroutine>` in a future using the
+ :meth:`BaseEventLoop.create_task` method.
If the argument is a :class:`Future`, it is returned directly.
@@ -566,18 +592,17 @@ Task functions
.. function:: wait_for(fut, timeout, \*, loop=None)
Wait for the single :class:`Future` or :ref:`coroutine object <coroutine>`
- to complete, with timeout. If *timeout* is ``None``, block until the future
+ to complete with timeout. If *timeout* is ``None``, block until the future
completes.
- Coroutine will be wrapped in :class:`Task`.
+ Coroutine objects are wrapped in a future using the
+ :meth:`BaseEventLoop.create_task` method.
Returns result of the Future or coroutine. When a timeout occurs, it
cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task
cancellation, wrap it in :func:`shield`.
- This function is a :ref:`coroutine <coroutine>`.
-
- Usage::
+ This function is a :ref:`coroutine <coroutine>`, usage::
- result = yield from asyncio.wait_for(fut, 60.0)
+ result = yield from asyncio.wait_for(fut, 60.0)
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 2230dc2..52c5517 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -151,6 +151,12 @@ class BaseEventLoop(events.AbstractEventLoop):
% (self.__class__.__name__, self.is_running(),
self.is_closed(), self.get_debug()))
+ def create_task(self, coro):
+ """Schedule a coroutine object.
+
+ Return a task object."""
+ return tasks.Task(coro, loop=self)
+
def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
"""Create socket transport."""
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index b389cfb..1f5e582 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -200,6 +200,10 @@ class AbstractEventLoop:
"""Return whether the event loop is currently running."""
raise NotImplementedError
+ def is_closed(self):
+ """Returns True if the event loop was closed."""
+ raise NotImplementedError
+
def close(self):
"""Close the loop.
@@ -225,6 +229,11 @@ class AbstractEventLoop:
def time(self):
raise NotImplementedError
+ # Method scheduling a coroutine object: create a task.
+
+ def create_task(self, coro):
+ raise NotImplementedError
+
# Methods for interacting with threads.
def call_soon_threadsafe(self, callback, *args):
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index a10b969..9bde218 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -213,7 +213,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
res = self._client_connected_cb(self._stream_reader,
self._stream_writer)
if coroutines.iscoroutine(res):
- tasks.Task(res, loop=self._loop)
+ self._loop.create_task(res)
def connection_lost(self, exc):
if exc is None:
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 8c7217b..befc296 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -505,7 +505,9 @@ def async(coro_or_future, *, loop=None):
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
- task = Task(coro_or_future, loop=loop)
+ if loop is None:
+ loop = events.get_event_loop()
+ task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
index ef3be23..6abcaf1 100644
--- a/Lib/asyncio/test_utils.py
+++ b/Lib/asyncio/test_utils.py
@@ -48,7 +48,7 @@ def run_briefly(loop):
def once():
pass
gen = once()
- t = tasks.Task(gen, loop=loop)
+ t = loop.create_task(gen)
# Don't log a warning if the task is not done after run_until_complete().
# It occurs if the loop is stopped or if a task raises a BaseException.
t._log_destroy_pending = False
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
index adba082..f6da7c3 100644
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -12,6 +12,7 @@ from test.support import IPV6_ENABLED
import asyncio
from asyncio import base_events
+from asyncio import events
from asyncio import constants
from asyncio import test_utils
@@ -526,6 +527,29 @@ class BaseEventLoopTests(test_utils.TestCase):
PYTHONASYNCIODEBUG='1')
self.assertEqual(stdout.rstrip(), b'False')
+ def test_create_task(self):
+ class MyTask(asyncio.Task):
+ pass
+
+ @asyncio.coroutine
+ def test():
+ pass
+
+ class EventLoop(base_events.BaseEventLoop):
+ def create_task(self, coro):
+ return MyTask(coro, loop=loop)
+
+ loop = EventLoop()
+ self.set_event_loop(loop)
+
+ coro = test()
+ task = asyncio.async(coro, loop=loop)
+ self.assertIsInstance(task, MyTask)
+
+ # make warnings quiet
+ task._log_destroy_pending = False
+ coro.close()
+
class MyProto(asyncio.Protocol):
done = None
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index beb6cec..b89416f 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -1969,8 +1969,12 @@ class AbstractEventLoopTests(unittest.TestCase):
self.assertRaises(
NotImplementedError, loop.is_running)
self.assertRaises(
+ NotImplementedError, loop.is_closed)
+ self.assertRaises(
NotImplementedError, loop.close)
self.assertRaises(
+ NotImplementedError, loop.create_task, None)
+ self.assertRaises(
NotImplementedError, loop.call_later, None, None)
self.assertRaises(
NotImplementedError, loop.call_at, f, f)
@@ -2027,6 +2031,16 @@ class AbstractEventLoopTests(unittest.TestCase):
mock.sentinel)
self.assertRaises(
NotImplementedError, loop.subprocess_exec, f)
+ self.assertRaises(
+ NotImplementedError, loop.set_exception_handler, f)
+ self.assertRaises(
+ NotImplementedError, loop.default_exception_handler, f)
+ self.assertRaises(
+ NotImplementedError, loop.call_exception_handler, f)
+ self.assertRaises(
+ NotImplementedError, loop.get_debug)
+ self.assertRaises(
+ NotImplementedError, loop.set_debug, f)
class ProtocolsAbsTests(unittest.TestCase):
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
index a6071ea..157adb7 100644
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -301,12 +301,12 @@ class FutureTests(test_utils.TestCase):
def test_future_exception_never_retrieved(self, m_log):
self.loop.set_debug(True)
- def memroy_error():
+ def memory_error():
try:
raise MemoryError()
except BaseException as exc:
return exc
- exc = memroy_error()
+ exc = memory_error()
future = asyncio.Future(loop=self.loop)
source_traceback = future._source_traceback
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index eaef05b..afadc7c 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -233,6 +233,9 @@ class TaskTests(test_utils.TestCase):
self.assertRegex(repr(task),
'<Task .* wait_for=%s>' % re.escape(repr(fut)))
+ fut.set_result(None)
+ self.loop.run_until_complete(task)
+
def test_task_basics(self):
@asyncio.coroutine
def outer():