diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2017-07-26 02:48:56 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-26 02:48:56 (GMT) |
commit | d0adfb25c5082046133a18fd185375508c1c334f (patch) | |
tree | 873d9be1f673242bf921bd4e169663c9d8170995 /Lib/test | |
parent | efe9fcbd2ca85ba9d6af6d95cc530a9c332f37c5 (diff) | |
download | cpython-d0adfb25c5082046133a18fd185375508c1c334f.zip cpython-d0adfb25c5082046133a18fd185375508c1c334f.tar.gz cpython-d0adfb25c5082046133a18fd185375508c1c334f.tar.bz2 |
[3.6] bpo-26762, bpo-31019: Backport multiprocessing fixes from master to 3.6 (#2879)
* bpo-26762: Avoid daemon process in _test_multiprocessing (#2842)
test_level() of _test_multiprocessing._TestLogging now uses regular
processes rather than daemon processes to prevent zombi processes
(to not "leak" processes).
(cherry picked from commit 06634950c553f8df83330ed468c11483b857b7dc)
* test_multiprocessing: Fix dangling process/thread (#2850)
bpo-26762: Fix more dangling processes and threads in
test_multiprocessing:
* Queue: call close() followed by join_thread()
* Process: call join() or self.addCleanup(p.join)
(cherry picked from commit d7e64d9934d86aa6173229de5af5fe908662a33a)
* test_multiprocessing detects dangling per test case (#2841)
bpo-26762: test_multiprocessing now detects dangling processes and
threads per test case classes:
* setUpClass()/tearDownClass() of mixin classes now check if
multiprocessing.process._dangling or threading._dangling was
modified to detect "dangling" processses and threads.
* ManagerMixin.tearDownClass() now also emits a warning if it still
has more than one active child process after 5 seconds.
* tearDownModule() now checks for dangling processes and threads
before sleep 500 ms. And it now only sleeps if there is a least one
dangling process or thread.
(cherry picked from commit ffb49408f0780ae80a553208aa133bc5bb3ba129)
* bpo-26762: test_multiprocessing close more queues (#2855)
* Close explicitly queues to make sure that we don't leave dangling
threads
* test_queue_in_process(): remove unused queue
* test_access() joins also the process to fix a random warning
(cherry picked from commit b4c52966c810b5c5e088fceff403247f610b7d13)
* bpo-31019: Fix multiprocessing.Process.is_alive() (#2875)
multiprocessing.Process.is_alive() now removes the process from the
_children set if the process completed.
The change prevents leaking "dangling" processes.
(cherry picked from commit 2db64823c20538a6cfc6033661fab5711d2d4585)
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 125 |
1 files changed, 103 insertions, 22 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 871a34e..4d3c655 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -32,11 +32,12 @@ test.support.import_module('multiprocessing.synchronize') # without thread support. import threading -import multiprocessing.dummy import multiprocessing.connection -import multiprocessing.managers +import multiprocessing.dummy import multiprocessing.heap +import multiprocessing.managers import multiprocessing.pool +import multiprocessing.queues from multiprocessing import util @@ -64,6 +65,13 @@ except ImportError: def latin(s): return s.encode('latin') + +def close_queue(queue): + if isinstance(queue, multiprocessing.queues.Queue): + queue.close() + queue.join_thread() + + # # Constants # @@ -275,6 +283,7 @@ class _TestProcess(BaseTestCase): self.assertEqual(p.exitcode, 0) self.assertEqual(p.is_alive(), False) self.assertNotIn(p, self.active_children()) + close_queue(q) @classmethod def _test_terminate(cls): @@ -414,6 +423,7 @@ class _TestProcess(BaseTestCase): p.join() self.assertIs(wr(), None) self.assertEqual(q.get(), 5) + close_queue(q) # @@ -600,6 +610,7 @@ class _TestQueue(BaseTestCase): self.assertEqual(queue_full(queue, MAXSIZE), False) proc.join() + close_queue(queue) @classmethod def _test_get(cls, queue, child_can_start, parent_can_continue): @@ -662,6 +673,7 @@ class _TestQueue(BaseTestCase): self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) proc.join() + close_queue(queue) @classmethod def _test_fork(cls, queue): @@ -697,6 +709,7 @@ class _TestQueue(BaseTestCase): self.assertRaises(pyqueue.Empty, queue.get, False) p.join() + close_queue(queue) def test_qsize(self): q = self.Queue() @@ -712,6 +725,7 @@ class _TestQueue(BaseTestCase): self.assertEqual(q.qsize(), 1) q.get() self.assertEqual(q.qsize(), 0) + close_queue(q) @classmethod def _test_task_done(cls, q): @@ -739,6 +753,7 @@ class _TestQueue(BaseTestCase): for p in workers: p.join() + close_queue(queue) def test_no_import_lock_contention(self): with test.support.temp_cwd(): @@ -769,6 +784,7 @@ class _TestQueue(BaseTestCase): # Tolerate a delta of 30 ms because of the bad clock resolution on # Windows (usually 15.6 ms) self.assertGreaterEqual(delta, 0.170) + close_queue(q) def test_queue_feeder_donot_stop_onexc(self): # bpo-30414: verify feeder handles exceptions correctly @@ -782,7 +798,9 @@ class _TestQueue(BaseTestCase): q = self.Queue() q.put(NotSerializable()) q.put(True) - self.assertTrue(q.get(timeout=0.1)) + # bpo-30595: use a timeout of 1 second for slow buildbots + self.assertTrue(q.get(timeout=1.0)) + close_queue(q) # # @@ -895,10 +913,12 @@ class _TestCondition(BaseTestCase): p = self.Process(target=self.f, args=(cond, sleeping, woken)) p.daemon = True p.start() + self.addCleanup(p.join) p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) p.daemon = True p.start() + self.addCleanup(p.join) # wait for both children to start sleeping sleeping.acquire() @@ -941,11 +961,13 @@ class _TestCondition(BaseTestCase): args=(cond, sleeping, woken, TIMEOUT1)) p.daemon = True p.start() + self.addCleanup(p.join) t = threading.Thread(target=self.f, args=(cond, sleeping, woken, TIMEOUT1)) t.daemon = True t.start() + self.addCleanup(t.join) # wait for them all to sleep for i in range(6): @@ -964,10 +986,12 @@ class _TestCondition(BaseTestCase): p = self.Process(target=self.f, args=(cond, sleeping, woken)) p.daemon = True p.start() + self.addCleanup(p.join) t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) t.daemon = True t.start() + self.addCleanup(t.join) # wait for them to all sleep for i in range(6): @@ -1143,6 +1167,7 @@ class _TestEvent(BaseTestCase): p.daemon = True p.start() self.assertEqual(wait(), True) + p.join() # # Tests for Barrier - adapted from tests in test/lock_tests.py @@ -1318,6 +1343,7 @@ class _TestBarrier(BaseTestCase): self.run_threads(self._test_wait_return_f, (self.barrier, queue)) results = [queue.get() for i in range(self.N)] self.assertEqual(results.count(0), 1) + close_queue(queue) @classmethod def _test_action_f(cls, barrier, results): @@ -1488,6 +1514,7 @@ class _TestBarrier(BaseTestCase): p = self.Process(target=self._test_thousand_f, args=(self.barrier, passes, child_conn, lock)) p.start() + self.addCleanup(p.join) for i in range(passes): for j in range(self.N): @@ -2971,6 +2998,8 @@ class _TestPicklingConnections(BaseTestCase): w.close() self.assertEqual(conn.recv(), 'foobar'*2) + p.join() + # # # @@ -3296,16 +3325,16 @@ class _TestLogging(BaseTestCase): logger.setLevel(LEVEL1) p = self.Process(target=self._test_level, args=(writer,)) - p.daemon = True p.start() self.assertEqual(LEVEL1, reader.recv()) + p.join() logger.setLevel(logging.NOTSET) root_logger.setLevel(LEVEL2) p = self.Process(target=self._test_level, args=(writer,)) - p.daemon = True p.start() self.assertEqual(LEVEL2, reader.recv()) + p.join() root_logger.setLevel(root_level) logger.setLevel(level=LOG_LEVEL) @@ -3459,7 +3488,7 @@ def _this_sub_process(q): except pyqueue.Empty: pass -def _test_process(q): +def _test_process(): queue = multiprocessing.Queue() subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) subProc.daemon = True @@ -3499,8 +3528,7 @@ class _file_like(object): class TestStdinBadfiledescriptor(unittest.TestCase): def test_queue_in_process(self): - queue = multiprocessing.Queue() - proc = multiprocessing.Process(target=_test_process, args=(queue,)) + proc = multiprocessing.Process(target=_test_process) proc.start() proc.join() @@ -4108,7 +4136,32 @@ class TestSimpleQueue(unittest.TestCase): # Mixins # -class ProcessesMixin(object): +class BaseMixin(object): + @classmethod + def setUpClass(cls): + cls.dangling = (multiprocessing.process._dangling.copy(), + threading._dangling.copy()) + + @classmethod + def tearDownClass(cls): + # bpo-26762: Some multiprocessing objects like Pool create reference + # cycles. Trigger a garbage collection to break these cycles. + test.support.gc_collect() + + processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) + if processes: + print('Warning -- Dangling processes: %s' % processes, + file=sys.stderr) + processes = None + + threads = set(threading._dangling) - set(cls.dangling[1]) + if threads: + print('Warning -- Dangling threads: %s' % threads, + file=sys.stderr) + threads = None + + +class ProcessesMixin(BaseMixin): TYPE = 'processes' Process = multiprocessing.Process connection = multiprocessing.connection @@ -4131,7 +4184,7 @@ class ProcessesMixin(object): RawArray = staticmethod(multiprocessing.RawArray) -class ManagerMixin(object): +class ManagerMixin(BaseMixin): TYPE = 'manager' Process = multiprocessing.Process Queue = property(operator.attrgetter('manager.Queue')) @@ -4155,6 +4208,7 @@ class ManagerMixin(object): @classmethod def setUpClass(cls): + super().setUpClass() cls.manager = multiprocessing.Manager() @classmethod @@ -4162,23 +4216,35 @@ class ManagerMixin(object): # only the manager process should be returned by active_children() # but this can take a bit on slow machines, so wait a few seconds # if there are other children too (see #17395) + start_time = time.monotonic() t = 0.01 - while len(multiprocessing.active_children()) > 1 and t < 5: + while len(multiprocessing.active_children()) > 1: time.sleep(t) t *= 2 + dt = time.monotonic() - start_time + if dt >= 5.0: + print("Warning -- multiprocessing.Manager still has %s active " + "children after %s seconds" + % (multiprocessing.active_children(), dt), + file=sys.stderr) + break + gc.collect() # do garbage collection if cls.manager._number_of_objects() != 0: # This is not really an error since some tests do not # ensure that all processes which hold a reference to a # managed object have been joined. - print('Shared objects which still exist at manager shutdown:') + print('Warning -- Shared objects which still exist at manager ' + 'shutdown:') print(cls.manager._debug_info()) cls.manager.shutdown() cls.manager.join() cls.manager = None + super().tearDownClass() + -class ThreadsMixin(object): +class ThreadsMixin(BaseMixin): TYPE = 'threads' Process = multiprocessing.dummy.Process connection = multiprocessing.dummy.connection @@ -4255,18 +4321,33 @@ def install_tests_in_module_dict(remote_globs, start_method): multiprocessing.get_logger().setLevel(LOG_LEVEL) def tearDownModule(): + need_sleep = False + + # bpo-26762: Some multiprocessing objects like Pool create reference + # cycles. Trigger a garbage collection to break these cycles. + test.support.gc_collect() + multiprocessing.set_start_method(old_start_method[0], force=True) # pause a bit so we don't get warning about dangling threads/processes - time.sleep(0.5) + processes = set(multiprocessing.process._dangling) - set(dangling[0]) + if processes: + need_sleep = True + print('Warning -- Dangling processes: %s' % processes, + file=sys.stderr) + processes = None + + threads = set(threading._dangling) - set(dangling[1]) + if threads: + need_sleep = True + print('Warning -- Dangling threads: %s' % threads, + file=sys.stderr) + threads = None + + # Sleep 500 ms to give time to child processes to complete. + if need_sleep: + time.sleep(0.5) multiprocessing.process._cleanup() - gc.collect() - tmp = set(multiprocessing.process._dangling) - set(dangling[0]) - if tmp: - print('Dangling processes:', tmp, file=sys.stderr) - del tmp - tmp = set(threading._dangling) - set(dangling[1]) - if tmp: - print('Dangling threads:', tmp, file=sys.stderr) + test.support.gc_collect() remote_globs['setUpModule'] = setUpModule remote_globs['tearDownModule'] = tearDownModule |