From c4b695f85e141f57d22d8edf7bc2c756da136918 Mon Sep 17 00:00:00 2001 From: Mark Nemec Date: Tue, 10 Apr 2018 18:23:14 +0100 Subject: bpo-33097: Fix submit accepting callable after executor shutdown by interpreter exit (GH-6144) Executors in concurrent.futures accepted tasks after executor was shutdown by interpreter exit. Tasks were left in PENDING state forever. This fix changes submit to instead raise a RuntimeError. --- Lib/concurrent/futures/process.py | 7 ++++++ Lib/concurrent/futures/thread.py | 7 ++++++ Lib/test/test_concurrent_futures.py | 28 ++++++++++++++++++++++ .../2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst | 2 ++ 4 files changed, 44 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 63f22cf..ce7d642 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -423,6 +423,10 @@ def _queue_management_worker(executor_reference, # - The executor that owns this worker has been shutdown. if shutting_down(): try: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown_thread = True # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: @@ -595,6 +599,9 @@ class ProcessPoolExecutor(_base.Executor): raise BrokenProcessPool(self._broken) if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') + if _global_shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 6e22950..b65dee1 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -87,6 +87,10 @@ def _worker(executor_reference, work_queue, initializer, initargs): # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown = True # Notice other workers work_queue.put(None) return @@ -145,6 +149,9 @@ class ThreadPoolExecutor(_base.Executor): if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') + if _shutdown: + raise RuntimeError('cannot schedule new futures after' + 'interpreter shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 18d0265..b258a0e 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -303,6 +303,34 @@ class ExecutorShutdownTest: self.assertFalse(err) self.assertEqual(out.strip(), b"apple") + def test_submit_after_interpreter_shutdown(self): + # Test the atexit hook for shutdown of worker threads and processes + rc, out, err = assert_python_ok('-c', """if 1: + import atexit + @atexit.register + def run_last(): + try: + t.submit(id, None) + except RuntimeError: + print("runtime-error") + raise + from concurrent.futures import {executor_type} + if __name__ == "__main__": + context = '{context}' + if not context: + t = {executor_type}(5) + else: + from multiprocessing import get_context + context = get_context(context) + t = {executor_type}(5, mp_context=context) + t.submit(id, 42).result() + """.format(executor_type=self.executor_type.__name__, + context=getattr(self, "ctx", ""))) + # Errors in atexit hooks don't change the process exit code, check + # stderr manually. + self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) + self.assertEqual(out.strip(), b"runtime-error") + def test_hang_issue12364(self): fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] self.executor.shutdown() diff --git a/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst b/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst new file mode 100644 index 0000000..d9411eb --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst @@ -0,0 +1,2 @@ +Raise RuntimeError when ``executor.submit`` is called during interpreter +shutdown. -- cgit v0.12