summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/base_events.py
diff options
context:
space:
mode:
authorKyle Stanley <aeros167@gmail.com>2019-09-19 12:47:22 (GMT)
committerAndrew Svetlov <andrew.svetlov@gmail.com>2019-09-19 12:47:22 (GMT)
commit9fdc64cf1266b6d5bf0503847b5c38e5edc53a14 (patch)
treeb2c07128fa5ef4fe3117604235199a618c0d10e5 /Lib/asyncio/base_events.py
parent3171d67a6aaf7fe88685b3a80644f0284686ef63 (diff)
downloadcpython-9fdc64cf1266b6d5bf0503847b5c38e5edc53a14.zip
cpython-9fdc64cf1266b6d5bf0503847b5c38e5edc53a14.tar.gz
cpython-9fdc64cf1266b6d5bf0503847b5c38e5edc53a14.tar.bz2
bpo-34037: Fix test_asyncio failure and add loop.shutdown_default_executor() (GH-15735)
Diffstat (limited to 'Lib/asyncio/base_events.py')
-rw-r--r--Lib/asyncio/base_events.py29
1 files changed, 29 insertions, 0 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 14b80bd..0310712 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -406,6 +406,8 @@ class BaseEventLoop(events.AbstractEventLoop):
self._asyncgens = weakref.WeakSet()
# Set to True when `loop.shutdown_asyncgens` is called.
self._asyncgens_shutdown_called = False
+ # Set to True when `loop.shutdown_default_executor` is called.
+ self._executor_shutdown_called = False
def __repr__(self):
return (
@@ -503,6 +505,10 @@ class BaseEventLoop(events.AbstractEventLoop):
if self._closed:
raise RuntimeError('Event loop is closed')
+ def _check_default_executor(self):
+ if self._executor_shutdown_called:
+ raise RuntimeError('Executor shutdown has been called')
+
def _asyncgen_finalizer_hook(self, agen):
self._asyncgens.discard(agen)
if not self.is_closed():
@@ -543,6 +549,26 @@ class BaseEventLoop(events.AbstractEventLoop):
'asyncgen': agen
})
+ async def shutdown_default_executor(self):
+ """Schedule the shutdown of the default executor."""
+ self._executor_shutdown_called = True
+ if self._default_executor is None:
+ return
+ future = self.create_future()
+ thread = threading.Thread(target=self._do_shutdown, args=(future,))
+ thread.start()
+ try:
+ await future
+ finally:
+ thread.join()
+
+ def _do_shutdown(self, future):
+ try:
+ self._default_executor.shutdown(wait=True)
+ self.call_soon_threadsafe(future.set_result, None)
+ except Exception as ex:
+ self.call_soon_threadsafe(future.set_exception, ex)
+
def run_forever(self):
"""Run until stop() is called."""
self._check_closed()
@@ -632,6 +658,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._closed = True
self._ready.clear()
self._scheduled.clear()
+ self._executor_shutdown_called = True
executor = self._default_executor
if executor is not None:
self._default_executor = None
@@ -768,6 +795,8 @@ class BaseEventLoop(events.AbstractEventLoop):
self._check_callback(func, 'run_in_executor')
if executor is None:
executor = self._default_executor
+ # Only check when the default executor is being used
+ self._check_default_executor()
if executor is None:
executor = concurrent.futures.ThreadPoolExecutor()
self._default_executor = executor