diff options
author | Kyle Stanley <aeros167@gmail.com> | 2019-09-19 12:47:22 (GMT) |
---|---|---|
committer | Andrew Svetlov <andrew.svetlov@gmail.com> | 2019-09-19 12:47:22 (GMT) |
commit | 9fdc64cf1266b6d5bf0503847b5c38e5edc53a14 (patch) | |
tree | b2c07128fa5ef4fe3117604235199a618c0d10e5 /Lib/asyncio/base_events.py | |
parent | 3171d67a6aaf7fe88685b3a80644f0284686ef63 (diff) | |
download | cpython-9fdc64cf1266b6d5bf0503847b5c38e5edc53a14.zip cpython-9fdc64cf1266b6d5bf0503847b5c38e5edc53a14.tar.gz cpython-9fdc64cf1266b6d5bf0503847b5c38e5edc53a14.tar.bz2 |
bpo-34037: Fix test_asyncio failure and add loop.shutdown_default_executor() (GH-15735)
Diffstat (limited to 'Lib/asyncio/base_events.py')
-rw-r--r-- | Lib/asyncio/base_events.py | 29 |
1 files changed, 29 insertions, 0 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 14b80bd..0310712 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -406,6 +406,8 @@ class BaseEventLoop(events.AbstractEventLoop): self._asyncgens = weakref.WeakSet() # Set to True when `loop.shutdown_asyncgens` is called. self._asyncgens_shutdown_called = False + # Set to True when `loop.shutdown_default_executor` is called. + self._executor_shutdown_called = False def __repr__(self): return ( @@ -503,6 +505,10 @@ class BaseEventLoop(events.AbstractEventLoop): if self._closed: raise RuntimeError('Event loop is closed') + def _check_default_executor(self): + if self._executor_shutdown_called: + raise RuntimeError('Executor shutdown has been called') + def _asyncgen_finalizer_hook(self, agen): self._asyncgens.discard(agen) if not self.is_closed(): @@ -543,6 +549,26 @@ class BaseEventLoop(events.AbstractEventLoop): 'asyncgen': agen }) + async def shutdown_default_executor(self): + """Schedule the shutdown of the default executor.""" + self._executor_shutdown_called = True + if self._default_executor is None: + return + future = self.create_future() + thread = threading.Thread(target=self._do_shutdown, args=(future,)) + thread.start() + try: + await future + finally: + thread.join() + + def _do_shutdown(self, future): + try: + self._default_executor.shutdown(wait=True) + self.call_soon_threadsafe(future.set_result, None) + except Exception as ex: + self.call_soon_threadsafe(future.set_exception, ex) + def run_forever(self): """Run until stop() is called.""" self._check_closed() @@ -632,6 +658,7 @@ class BaseEventLoop(events.AbstractEventLoop): self._closed = True self._ready.clear() self._scheduled.clear() + self._executor_shutdown_called = True executor = self._default_executor if executor is not None: self._default_executor = None @@ -768,6 +795,8 @@ class BaseEventLoop(events.AbstractEventLoop): self._check_callback(func, 'run_in_executor') if executor is None: executor = self._default_executor + # Only check when the default executor is being used + self._check_default_executor() if executor is None: executor = concurrent.futures.ThreadPoolExecutor() self._default_executor = executor |