diff options
author | Yury Selivanov <yury@magic.io> | 2017-12-23 20:04:15 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-23 20:04:15 (GMT) |
commit | ca9b36cd1a384e5ecb56d9df9a59144240353ef0 (patch) | |
tree | 3efb5c02ae40b61eb5c6391d3144d0f2f26cd616 /Lib/asyncio | |
parent | 558aa30f7971e087c4a00b1f49cc2ef3195c01ca (diff) | |
download | cpython-ca9b36cd1a384e5ecb56d9df9a59144240353ef0.zip cpython-ca9b36cd1a384e5ecb56d9df9a59144240353ef0.tar.gz cpython-ca9b36cd1a384e5ecb56d9df9a59144240353ef0.tar.bz2 |
bpo-32415: Add asyncio.Task.get_loop() and Future.get_loop() (#4992)
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/base_events.py | 2 | ||||
-rw-r--r-- | Lib/asyncio/futures.py | 20 | ||||
-rw-r--r-- | Lib/asyncio/tasks.py | 39 |
3 files changed, 36 insertions, 25 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 2ab8a76..96cc4f0 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -145,7 +145,7 @@ def _run_until_complete_cb(fut): # Issue #22429: run_forever() already finished, no need to # stop it. return - fut._loop.stop() + futures._get_loop(fut).stop() class Server(events.AbstractServer): diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index b310962..24843c0 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -105,6 +105,10 @@ class Future: context['source_traceback'] = self._source_traceback self._loop.call_exception_handler(context) + def get_loop(self): + """Return the event loop the Future is bound to.""" + return self._loop + def cancel(self): """Cancel the future and schedule callbacks. @@ -249,6 +253,18 @@ class Future: _PyFuture = Future +def _get_loop(fut): + # Tries to call Future.get_loop() if it's available. + # Otherwise fallbacks to using the old '_loop' property. + try: + get_loop = fut.get_loop + except AttributeError: + pass + else: + return get_loop() + return fut._loop + + def _set_result_unless_cancelled(fut, result): """Helper setting the result only if the future was not cancelled.""" if fut.cancelled(): @@ -304,8 +320,8 @@ def _chain_future(source, destination): if not isfuture(destination) and not isinstance(destination, concurrent.futures.Future): raise TypeError('A future is required for destination argument') - source_loop = source._loop if isfuture(source) else None - dest_loop = destination._loop if isfuture(destination) else None + source_loop = _get_loop(source) if isfuture(source) else None + dest_loop = _get_loop(destination) if isfuture(destination) else None def _set_state(future, other): if isfuture(future): diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index ff8a486..572e707 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -34,7 +34,7 @@ def all_tasks(loop=None): """Return a set of all tasks for the loop.""" if loop is None: loop = events.get_event_loop() - return {t for t, l in _all_tasks.items() if l is loop} + return {t for t in _all_tasks if futures._get_loop(t) is loop} class Task(futures.Future): @@ -96,7 +96,7 @@ class Task(futures.Future): self._coro = coro self._loop.call_soon(self._step) - _register_task(self._loop, self) + _register_task(self) def __del__(self): if self._state == futures._PENDING and self._log_destroy_pending: @@ -215,7 +215,7 @@ class Task(futures.Future): blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # Yielded Future must come from Future.__iter__(). - if result._loop is not self._loop: + if futures._get_loop(result) is not self._loop: new_exc = RuntimeError( f'Task {self!r} got Future ' f'{result!r} attached to a different loop') @@ -510,9 +510,9 @@ async def sleep(delay, result=None, *, loop=None): if loop is None: loop = events.get_event_loop() future = loop.create_future() - h = future._loop.call_later(delay, - futures._set_result_unless_cancelled, - future, result) + h = loop.call_later(delay, + futures._set_result_unless_cancelled, + future, result) try: return await future finally: @@ -525,7 +525,7 @@ def ensure_future(coro_or_future, *, loop=None): If the argument is a Future, it is returned directly. """ if futures.isfuture(coro_or_future): - if loop is not None and loop is not coro_or_future._loop: + if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('loop argument must agree with Future') return coro_or_future elif coroutines.iscoroutine(coro_or_future): @@ -655,7 +655,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): if arg not in arg_to_fut: fut = ensure_future(arg, loop=loop) if loop is None: - loop = fut._loop + loop = futures._get_loop(fut) if fut is not arg: # 'arg' was not a Future, therefore, 'fut' is a new # Future created specifically for 'arg'. Since the caller @@ -707,7 +707,7 @@ def shield(arg, *, loop=None): if inner.done(): # Shortcut. return inner - loop = inner._loop + loop = futures._get_loop(inner) outer = loop.create_future() def _done_callback(inner): @@ -751,23 +751,17 @@ def run_coroutine_threadsafe(coro, loop): return future -# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive. -# Task should be a weak reference to remove entry on task garbage -# collection, EventLoop is required -# to not access to private task._loop attribute. -_all_tasks = weakref.WeakKeyDictionary() +# WeakSet containing all alive tasks. +_all_tasks = weakref.WeakSet() # Dictionary containing tasks that are currently active in # all running event loops. {EventLoop: Task} _current_tasks = {} -def _register_task(loop, task): - """Register a new task in asyncio as executed by loop. - - Returns None. - """ - _all_tasks[task] = loop +def _register_task(task): + """Register a new task in asyncio as executed by loop.""" + _all_tasks.add(task) def _enter_task(loop, task): @@ -786,8 +780,9 @@ def _leave_task(loop, task): del _current_tasks[loop] -def _unregister_task(loop, task): - _all_tasks.pop(task, None) +def _unregister_task(task): + """Unregister a task.""" + _all_tasks.discard(task) _py_register_task = _register_task |