From 9fdc64cf1266b6d5bf0503847b5c38e5edc53a14 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 19 Sep 2019 08:47:22 -0400 Subject: bpo-34037: Fix test_asyncio failure and add loop.shutdown_default_executor() (GH-15735) --- Doc/library/asyncio-eventloop.rst | 12 +++++++++ Doc/library/asyncio-task.rst | 6 +++-- Lib/asyncio/base_events.py | 29 ++++++++++++++++++++++ Lib/asyncio/events.py | 4 +++ Lib/asyncio/runners.py | 1 + .../2019-09-11-21-38-41.bpo-34037.LIAS_3.rst | 4 +++ 6 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2019-09-11-21-38-41.bpo-34037.LIAS_3.rst diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 8f7974b..2fd4cf3 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -167,6 +167,18 @@ Running and stopping the loop .. versionadded:: 3.6 +.. coroutinemethod:: loop.shutdown_default_executor() + + Schedule the closure of the default executor and wait for it to join all of + the threads in the :class:`ThreadPoolExecutor`. After calling this method, a + :exc:`RuntimeError` will be raised if :meth:`loop.run_in_executor` is called + while using the default executor. + + Note that there is no need to call this function when + :func:`asyncio.run` is used. + + .. versionadded:: 3.9 + Scheduling callbacks ^^^^^^^^^^^^^^^^^^^^ diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 57e0e07..d932042 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -213,8 +213,8 @@ Running an asyncio Program .. function:: run(coro, \*, debug=False) This function runs the passed coroutine, taking care of - managing the asyncio event loop and *finalizing asynchronous - generators*. + managing the asyncio event loop, *finalizing asynchronous + generators*, and closing the threadpool. This function cannot be called when another asyncio event loop is running in the same thread. @@ -229,6 +229,8 @@ Running an asyncio Program **Important:** this function has been added to asyncio in Python 3.7 on a :term:`provisional basis `. + .. versionchanged:: 3.9 + Updated to use :meth:`loop.shutdown_default_executor`. Creating Tasks ============== diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 14b80bd..0310712 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -406,6 +406,8 @@ class BaseEventLoop(events.AbstractEventLoop): self._asyncgens = weakref.WeakSet() # Set to True when `loop.shutdown_asyncgens` is called. self._asyncgens_shutdown_called = False + # Set to True when `loop.shutdown_default_executor` is called. + self._executor_shutdown_called = False def __repr__(self): return ( @@ -503,6 +505,10 @@ class BaseEventLoop(events.AbstractEventLoop): if self._closed: raise RuntimeError('Event loop is closed') + def _check_default_executor(self): + if self._executor_shutdown_called: + raise RuntimeError('Executor shutdown has been called') + def _asyncgen_finalizer_hook(self, agen): self._asyncgens.discard(agen) if not self.is_closed(): @@ -543,6 +549,26 @@ class BaseEventLoop(events.AbstractEventLoop): 'asyncgen': agen }) + async def shutdown_default_executor(self): + """Schedule the shutdown of the default executor.""" + self._executor_shutdown_called = True + if self._default_executor is None: + return + future = self.create_future() + thread = threading.Thread(target=self._do_shutdown, args=(future,)) + thread.start() + try: + await future + finally: + thread.join() + + def _do_shutdown(self, future): + try: + self._default_executor.shutdown(wait=True) + self.call_soon_threadsafe(future.set_result, None) + except Exception as ex: + self.call_soon_threadsafe(future.set_exception, ex) + def run_forever(self): """Run until stop() is called.""" self._check_closed() @@ -632,6 +658,7 @@ class BaseEventLoop(events.AbstractEventLoop): self._closed = True self._ready.clear() self._scheduled.clear() + self._executor_shutdown_called = True executor = self._default_executor if executor is not None: self._default_executor = None @@ -768,6 +795,8 @@ class BaseEventLoop(events.AbstractEventLoop): self._check_callback(func, 'run_in_executor') if executor is None: executor = self._default_executor + # Only check when the default executor is being used + self._check_default_executor() if executor is None: executor = concurrent.futures.ThreadPoolExecutor() self._default_executor = executor diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 5fb5464..2f06c4a 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -249,6 +249,10 @@ class AbstractEventLoop: """Shutdown all active asynchronous generators.""" raise NotImplementedError + async def shutdown_default_executor(self): + """Schedule the shutdown of the default executor.""" + raise NotImplementedError + # Methods scheduling callbacks. All these return Handles. def _timer_handle_cancelled(self, handle): diff --git a/Lib/asyncio/runners.py b/Lib/asyncio/runners.py index 5fbab03..6c87747 100644 --- a/Lib/asyncio/runners.py +++ b/Lib/asyncio/runners.py @@ -45,6 +45,7 @@ def run(main, *, debug=False): try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) + loop.run_until_complete(loop.shutdown_default_executor()) finally: events.set_event_loop(None) loop.close() diff --git a/Misc/NEWS.d/next/Library/2019-09-11-21-38-41.bpo-34037.LIAS_3.rst b/Misc/NEWS.d/next/Library/2019-09-11-21-38-41.bpo-34037.LIAS_3.rst new file mode 100644 index 0000000..7534516 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-09-11-21-38-41.bpo-34037.LIAS_3.rst @@ -0,0 +1,4 @@ +For :mod:`asyncio`, add a new coroutine :meth:`loop.shutdown_default_executor`. +The new coroutine provides an API to schedule an executor shutdown that waits +on the threadpool to finish closing. Also, :func:`asyncio.run` has been updated +to utilize the new coroutine. Patch by Kyle Stanley. -- cgit v0.12