From aebac0b55a1e3addb93ec7992046a4f9561b4175 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 24 Mar 2011 15:47:39 +0100 Subject: Add tests for the atexit hook in concurrent.futures (part of #11635) --- Lib/test/test_concurrent_futures.py | 55 +++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index f639eec..2662af7 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -9,6 +9,9 @@ test.support.import_module('multiprocessing.synchronize') # without thread support. test.support.import_module('threading') +from test.script_helper import assert_python_ok + +import sys import threading import time import unittest @@ -43,9 +46,30 @@ def sleep_and_raise(t): time.sleep(t) raise Exception('this is an exception') +def sleep_and_print(t, msg): + time.sleep(t) + print(msg) + sys.stdout.flush() + class ExecutorMixin: worker_count = 5 + + def setUp(self): + self.t1 = time.time() + try: + self.executor = self.executor_type(max_workers=self.worker_count) + except NotImplementedError as e: + self.skipTest(str(e)) + self._prime_executor() + + def tearDown(self): + self.executor.shutdown(wait=True) + dt = time.time() - self.t1 + if test.support.verbose: + print("%.2fs" % dt, end=' ') + self.assertLess(dt, 60, "synchronization issue: test lasted too long") + def _prime_executor(self): # Make sure that the executor is ready to do work before running the # tests. This should reduce the probability of timeouts in the tests. @@ -57,24 +81,11 @@ class ExecutorMixin: class ThreadPoolMixin(ExecutorMixin): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=5) - self._prime_executor() - - def tearDown(self): - self.executor.shutdown(wait=True) + executor_type = futures.ThreadPoolExecutor class ProcessPoolMixin(ExecutorMixin): - def setUp(self): - try: - self.executor = futures.ProcessPoolExecutor(max_workers=5) - except NotImplementedError as e: - self.skipTest(str(e)) - self._prime_executor() - - def tearDown(self): - self.executor.shutdown(wait=True) + executor_type = futures.ProcessPoolExecutor class ExecutorShutdownTest(unittest.TestCase): @@ -84,6 +95,20 @@ class ExecutorShutdownTest(unittest.TestCase): self.executor.submit, pow, 2, 5) + def test_interpreter_shutdown(self): + # Test the atexit hook for shutdown of worker threads and processes + rc, out, err = assert_python_ok('-c', """if 1: + from concurrent.futures import {executor_type} + from time import sleep + from test.test_concurrent_futures import sleep_and_print + t = {executor_type}(5) + t.submit(sleep_and_print, 1.0, "apple") + """.format(executor_type=self.executor_type.__name__)) + # Errors in atexit hooks don't change the process exit code, check + # stderr manually. + self.assertFalse(err) + self.assertEqual(out.strip(), b"apple") + class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): def _prime_executor(self): -- cgit v0.12