diff options
author | Kyle Stanley <aeros167@gmail.com> | 2020-03-27 19:31:22 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-27 19:31:22 (GMT) |
commit | b61b818d916942aad1f8f3e33181801c4a1ed14b (patch) | |
tree | e755041eb620f5b2937639b32c0b98d1afa54c89 | |
parent | 5f9c131c099d6675d1a9d0228497865488afd548 (diff) | |
download | cpython-b61b818d916942aad1f8f3e33181801c4a1ed14b.zip cpython-b61b818d916942aad1f8f3e33181801c4a1ed14b.tar.gz cpython-b61b818d916942aad1f8f3e33181801c4a1ed14b.tar.bz2 |
bpo-39812: Remove daemon threads in concurrent.futures (GH-19149)
Remove daemon threads from :mod:`concurrent.futures` by adding
an internal `threading._register_atexit()`, which calls registered functions
prior to joining all non-daemon threads. This allows for compatibility
with subinterpreters, which don't support daemon threads.
-rw-r--r-- | Doc/whatsnew/3.9.rst | 5 | ||||
-rw-r--r-- | Lib/concurrent/futures/process.py | 23 | ||||
-rw-r--r-- | Lib/concurrent/futures/thread.py | 20 | ||||
-rw-r--r-- | Lib/test/test_threading.py | 50 | ||||
-rw-r--r-- | Lib/threading.py | 29 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst | 4 |
6 files changed, 99 insertions, 32 deletions
diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index 778e443..a76445b 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -195,6 +195,11 @@ which have not started running, instead of waiting for them to complete before shutting down the executor. (Contributed by Kyle Stanley in :issue:`39349`.) +Removed daemon threads from :class:`~concurrent.futures.ThreadPoolExecutor` +and :class:`~concurrent.futures.ProcessPoolExecutor`. This improves +compatibility with subinterpreters and predictability in their shutdown +processes. (Contributed by Kyle Stanley in :issue:`39812`.) + curses ------ diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 39fadcc..4c39500 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -59,19 +59,6 @@ import itertools import sys import traceback -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a -# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, -# allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpreter shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads/processes finish. _threads_wakeups = weakref.WeakKeyDictionary() _global_shutdown = False @@ -107,6 +94,12 @@ def _python_exit(): for t, _ in items: t.join() +# Register for `_python_exit()` to be called just before joining all +# non-daemon threads. This is used instead of `atexit.register()` for +# compatibility with subinterpreters, which no longer support daemon threads. +# See bpo-39812 for context. +threading._register_atexit(_python_exit) + # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for # work while a larger number will make Future.cancel() succeed less frequently @@ -306,9 +299,7 @@ class _ExecutorManagerThread(threading.Thread): # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} self.pending_work_items = executor._pending_work_items - # Set this thread to be daemonized super().__init__() - self.daemon = True def run(self): # Main loop for the executor manager thread. @@ -732,5 +723,3 @@ class ProcessPoolExecutor(_base.Executor): self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ - -atexit.register(_python_exit) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index be79161..2aa4e17 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -13,19 +13,6 @@ import threading import weakref import os -# Workers are created as daemon threads. This is done to allow the interpreter -# to exit when there are still idle threads in a ThreadPoolExecutor's thread -# pool (i.e. shutdown() was not called). However, allowing workers to die with -# the interpreter has two undesirable properties: -# - The workers would still be running during interpreter shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads finish. _threads_queues = weakref.WeakKeyDictionary() _shutdown = False @@ -43,7 +30,11 @@ def _python_exit(): for t, q in items: t.join() -atexit.register(_python_exit) +# Register for `_python_exit()` to be called just before joining all +# non-daemon threads. This is used instead of `atexit.register()` for +# compatibility with subinterpreters, which no longer support daemon threads. +# See bpo-39812 for context. +threading._register_atexit(_python_exit) class _WorkItem(object): @@ -197,7 +188,6 @@ class ThreadPoolExecutor(_base.Executor): self._work_queue, self._initializer, self._initargs)) - t.daemon = True t.start() self._threads.add(t) _threads_queues[t] = self._work_queue diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index f1037b5..da17e12 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1397,5 +1397,55 @@ class InterruptMainTests(unittest.TestCase): signal.signal(signal.SIGINT, handler) +class AtexitTests(unittest.TestCase): + + def test_atexit_output(self): + rc, out, err = assert_python_ok("-c", """if True: + import threading + + def run_last(): + print('parrot') + + threading._register_atexit(run_last) + """) + + self.assertFalse(err) + self.assertEqual(out.strip(), b'parrot') + + def test_atexit_called_once(self): + rc, out, err = assert_python_ok("-c", """if True: + import threading + from unittest.mock import Mock + + mock = Mock() + threading._register_atexit(mock) + mock.assert_not_called() + # force early shutdown to ensure it was called once + threading._shutdown() + mock.assert_called_once() + """) + + self.assertFalse(err) + + def test_atexit_after_shutdown(self): + # The only way to do this is by registering an atexit within + # an atexit, which is intended to raise an exception. + rc, out, err = assert_python_ok("-c", """if True: + import threading + + def func(): + pass + + def run_last(): + threading._register_atexit(func) + + threading._register_atexit(run_last) + """) + + self.assertTrue(err) + self.assertIn("RuntimeError: can't register atexit after shutdown", + err.decode()) + + if __name__ == "__main__": unittest.main() diff --git a/Lib/threading.py b/Lib/threading.py index 46eb1b9..6b25e7a 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -3,6 +3,7 @@ import os as _os import sys as _sys import _thread +import functools from time import monotonic as _time from _weakrefset import WeakSet @@ -1346,6 +1347,27 @@ def enumerate(): with _active_limbo_lock: return list(_active.values()) + list(_limbo.values()) + +_threading_atexits = [] +_SHUTTING_DOWN = False + +def _register_atexit(func, *arg, **kwargs): + """CPython internal: register *func* to be called before joining threads. + + The registered *func* is called with its arguments just before all + non-daemon threads are joined in `_shutdown()`. It provides a similar + purpose to `atexit.register()`, but its functions are called prior to + threading shutdown instead of interpreter shutdown. + + For similarity to atexit, the registered functions are called in reverse. + """ + if _SHUTTING_DOWN: + raise RuntimeError("can't register atexit after shutdown") + + call = functools.partial(func, *arg, **kwargs) + _threading_atexits.append(call) + + from _thread import stack_size # Create the main thread object, @@ -1367,6 +1389,8 @@ def _shutdown(): # _shutdown() was already called return + global _SHUTTING_DOWN + _SHUTTING_DOWN = True # Main thread tlock = _main_thread._tstate_lock # The main thread isn't finished yet, so its thread state lock can't have @@ -1376,6 +1400,11 @@ def _shutdown(): tlock.release() _main_thread._stop() + # Call registered threading atexit functions before threads are joined. + # Order is reversed, similar to atexit. + for atexit_call in reversed(_threading_atexits): + atexit_call() + # Join all non-deamon threads while True: with _shutdown_locks_lock: diff --git a/Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst b/Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst new file mode 100644 index 0000000..4cea878 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst @@ -0,0 +1,4 @@ +Removed daemon threads from :mod:`concurrent.futures` by adding
+an internal `threading._register_atexit()`, which calls registered functions
+prior to joining all non-daemon threads. This allows for compatibility
+with subinterpreters, which don't support daemon threads.
\ No newline at end of file |