summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2017-12-23 20:04:15 (GMT)
committerGitHub <noreply@github.com>2017-12-23 20:04:15 (GMT)
commitca9b36cd1a384e5ecb56d9df9a59144240353ef0 (patch)
tree3efb5c02ae40b61eb5c6391d3144d0f2f26cd616 /Lib/asyncio
parent558aa30f7971e087c4a00b1f49cc2ef3195c01ca (diff)
downloadcpython-ca9b36cd1a384e5ecb56d9df9a59144240353ef0.zip
cpython-ca9b36cd1a384e5ecb56d9df9a59144240353ef0.tar.gz
cpython-ca9b36cd1a384e5ecb56d9df9a59144240353ef0.tar.bz2
bpo-32415: Add asyncio.Task.get_loop() and Future.get_loop() (#4992)
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/base_events.py2
-rw-r--r--Lib/asyncio/futures.py20
-rw-r--r--Lib/asyncio/tasks.py39
3 files changed, 36 insertions, 25 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 2ab8a76..96cc4f0 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -145,7 +145,7 @@ def _run_until_complete_cb(fut):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
- fut._loop.stop()
+ futures._get_loop(fut).stop()
class Server(events.AbstractServer):
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index b310962..24843c0 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -105,6 +105,10 @@ class Future:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
+ def get_loop(self):
+ """Return the event loop the Future is bound to."""
+ return self._loop
+
def cancel(self):
"""Cancel the future and schedule callbacks.
@@ -249,6 +253,18 @@ class Future:
_PyFuture = Future
+def _get_loop(fut):
+ # Tries to call Future.get_loop() if it's available.
+ # Otherwise fallbacks to using the old '_loop' property.
+ try:
+ get_loop = fut.get_loop
+ except AttributeError:
+ pass
+ else:
+ return get_loop()
+ return fut._loop
+
+
def _set_result_unless_cancelled(fut, result):
"""Helper setting the result only if the future was not cancelled."""
if fut.cancelled():
@@ -304,8 +320,8 @@ def _chain_future(source, destination):
if not isfuture(destination) and not isinstance(destination,
concurrent.futures.Future):
raise TypeError('A future is required for destination argument')
- source_loop = source._loop if isfuture(source) else None
- dest_loop = destination._loop if isfuture(destination) else None
+ source_loop = _get_loop(source) if isfuture(source) else None
+ dest_loop = _get_loop(destination) if isfuture(destination) else None
def _set_state(future, other):
if isfuture(future):
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index ff8a486..572e707 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -34,7 +34,7 @@ def all_tasks(loop=None):
"""Return a set of all tasks for the loop."""
if loop is None:
loop = events.get_event_loop()
- return {t for t, l in _all_tasks.items() if l is loop}
+ return {t for t in _all_tasks if futures._get_loop(t) is loop}
class Task(futures.Future):
@@ -96,7 +96,7 @@ class Task(futures.Future):
self._coro = coro
self._loop.call_soon(self._step)
- _register_task(self._loop, self)
+ _register_task(self)
def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending:
@@ -215,7 +215,7 @@ class Task(futures.Future):
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# Yielded Future must come from Future.__iter__().
- if result._loop is not self._loop:
+ if futures._get_loop(result) is not self._loop:
new_exc = RuntimeError(
f'Task {self!r} got Future '
f'{result!r} attached to a different loop')
@@ -510,9 +510,9 @@ async def sleep(delay, result=None, *, loop=None):
if loop is None:
loop = events.get_event_loop()
future = loop.create_future()
- h = future._loop.call_later(delay,
- futures._set_result_unless_cancelled,
- future, result)
+ h = loop.call_later(delay,
+ futures._set_result_unless_cancelled,
+ future, result)
try:
return await future
finally:
@@ -525,7 +525,7 @@ def ensure_future(coro_or_future, *, loop=None):
If the argument is a Future, it is returned directly.
"""
if futures.isfuture(coro_or_future):
- if loop is not None and loop is not coro_or_future._loop:
+ if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
@@ -655,7 +655,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
if arg not in arg_to_fut:
fut = ensure_future(arg, loop=loop)
if loop is None:
- loop = fut._loop
+ loop = futures._get_loop(fut)
if fut is not arg:
# 'arg' was not a Future, therefore, 'fut' is a new
# Future created specifically for 'arg'. Since the caller
@@ -707,7 +707,7 @@ def shield(arg, *, loop=None):
if inner.done():
# Shortcut.
return inner
- loop = inner._loop
+ loop = futures._get_loop(inner)
outer = loop.create_future()
def _done_callback(inner):
@@ -751,23 +751,17 @@ def run_coroutine_threadsafe(coro, loop):
return future
-# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
-# Task should be a weak reference to remove entry on task garbage
-# collection, EventLoop is required
-# to not access to private task._loop attribute.
-_all_tasks = weakref.WeakKeyDictionary()
+# WeakSet containing all alive tasks.
+_all_tasks = weakref.WeakSet()
# Dictionary containing tasks that are currently active in
# all running event loops. {EventLoop: Task}
_current_tasks = {}
-def _register_task(loop, task):
- """Register a new task in asyncio as executed by loop.
-
- Returns None.
- """
- _all_tasks[task] = loop
+def _register_task(task):
+ """Register a new task in asyncio as executed by loop."""
+ _all_tasks.add(task)
def _enter_task(loop, task):
@@ -786,8 +780,9 @@ def _leave_task(loop, task):
del _current_tasks[loop]
-def _unregister_task(loop, task):
- _all_tasks.pop(task, None)
+def _unregister_task(task):
+ """Unregister a task."""
+ _all_tasks.discard(task)
_py_register_task = _register_task