diff options
author | Guido van Rossum <guido@dropbox.com> | 2013-11-04 23:50:46 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@dropbox.com> | 2013-11-04 23:50:46 (GMT) |
commit | 0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963 (patch) | |
tree | 4d8d97d49e5a5aa9aa03df73450545ab4d780943 /Lib/test/test_asyncio/test_unix_events.py | |
parent | ccea08462b753fc78ec97cc5717de8f163b503ec (diff) | |
download | cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.zip cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.tar.gz cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.tar.bz2 |
asyncio: Refactor SIGCHLD handling. By Anthony Baire.
Diffstat (limited to 'Lib/test/test_asyncio/test_unix_events.py')
-rw-r--r-- | Lib/test/test_asyncio/test_unix_events.py | 987 |
1 files changed, 869 insertions, 118 deletions
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index f29e7af..a4d835e 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -3,10 +3,12 @@ import gc import errno import io +import os import pprint import signal import stat import sys +import threading import unittest import unittest.mock @@ -181,124 +183,6 @@ class SelectorEventLoopTests(unittest.TestCase): self.assertRaises( RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP) - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG): - m_waitpid.side_effect = [(7, object()), ChildProcessError] - m_WIFEXITED.return_value = True - m_WIFSIGNALED.return_value = False - m_WEXITSTATUS.return_value = 3 - transp = unittest.mock.Mock() - self.loop._subprocesses[7] = transp - - self.loop._sig_chld() - transp._process_exited.assert_called_with(3) - self.assertFalse(m_WTERMSIG.called) - - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld_signal(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG): - m_waitpid.side_effect = [(7, object()), ChildProcessError] - m_WIFEXITED.return_value = False - m_WIFSIGNALED.return_value = True - m_WTERMSIG.return_value = 1 - transp = unittest.mock.Mock() - self.loop._subprocesses[7] = transp - - self.loop._sig_chld() - transp._process_exited.assert_called_with(-1) - self.assertFalse(m_WEXITSTATUS.called) - - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld_zero_pid(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG): - m_waitpid.side_effect = [(0, object()), ChildProcessError] - transp = unittest.mock.Mock() - self.loop._subprocesses[7] = transp - - self.loop._sig_chld() - self.assertFalse(transp._process_exited.called) - self.assertFalse(m_WIFSIGNALED.called) - self.assertFalse(m_WIFEXITED.called) - self.assertFalse(m_WTERMSIG.called) - self.assertFalse(m_WEXITSTATUS.called) - - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld_not_registered_subprocess(self, m_waitpid, - m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG): - m_waitpid.side_effect = [(7, object()), ChildProcessError] - m_WIFEXITED.return_value = True - m_WIFSIGNALED.return_value = False - m_WEXITSTATUS.return_value = 3 - - self.loop._sig_chld() - self.assertFalse(m_WTERMSIG.called) - - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld_unknown_status(self, m_waitpid, - m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG): - m_waitpid.side_effect = [(7, object()), ChildProcessError] - m_WIFEXITED.return_value = False - m_WIFSIGNALED.return_value = False - transp = unittest.mock.Mock() - self.loop._subprocesses[7] = transp - - self.loop._sig_chld() - self.assertTrue(transp._process_exited.called) - self.assertFalse(m_WEXITSTATUS.called) - self.assertFalse(m_WTERMSIG.called) - - @unittest.mock.patch('asyncio.unix_events.logger') - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld_unknown_status_in_handler(self, m_waitpid, - m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG, - m_log): - m_waitpid.side_effect = Exception - transp = unittest.mock.Mock() - self.loop._subprocesses[7] = transp - - self.loop._sig_chld() - self.assertFalse(transp._process_exited.called) - self.assertFalse(m_WIFSIGNALED.called) - self.assertFalse(m_WIFEXITED.called) - self.assertFalse(m_WTERMSIG.called) - self.assertFalse(m_WEXITSTATUS.called) - m_log.exception.assert_called_with( - 'Unknown exception in SIGCHLD handler') - - @unittest.mock.patch('os.waitpid') - def test__sig_chld_process_error(self, m_waitpid): - m_waitpid.side_effect = ChildProcessError - self.loop._sig_chld() - self.assertTrue(m_waitpid.called) - class UnixReadPipeTransportTests(unittest.TestCase): @@ -777,5 +661,872 @@ class UnixWritePipeTransportTests(unittest.TestCase): self.assertFalse(self.protocol.connection_lost.called) +class AbstractChildWatcherTests(unittest.TestCase): + + def test_not_implemented(self): + f = unittest.mock.Mock() + watcher = unix_events.AbstractChildWatcher() + self.assertRaises( + NotImplementedError, watcher.add_child_handler, f, f) + self.assertRaises( + NotImplementedError, watcher.remove_child_handler, f) + self.assertRaises( + NotImplementedError, watcher.set_loop, f) + self.assertRaises( + NotImplementedError, watcher.close) + self.assertRaises( + NotImplementedError, watcher.__enter__) + self.assertRaises( + NotImplementedError, watcher.__exit__, f, f, f) + + +class BaseChildWatcherTests(unittest.TestCase): + + def test_not_implemented(self): + f = unittest.mock.Mock() + watcher = unix_events.BaseChildWatcher(None) + self.assertRaises( + NotImplementedError, watcher._do_waitpid, f) + + +class ChildWatcherTestsMixin: + instance = None + + ignore_warnings = unittest.mock.patch.object(unix_events.logger, "warning") + + def setUp(self): + self.loop = test_utils.TestLoop() + self.running = False + self.zombies = {} + + assert ChildWatcherTestsMixin.instance is None + ChildWatcherTestsMixin.instance = self + + with unittest.mock.patch.object( + self.loop, "add_signal_handler") as self.m_add_signal_handler: + self.watcher = self.create_watcher(self.loop) + + def tearDown(self): + ChildWatcherTestsMixin.instance = None + + def waitpid(pid, flags): + self = ChildWatcherTestsMixin.instance + if isinstance(self.watcher, unix_events.SafeChildWatcher) or pid != -1: + self.assertGreater(pid, 0) + try: + if pid < 0: + return self.zombies.popitem() + else: + return pid, self.zombies.pop(pid) + except KeyError: + pass + if self.running: + return 0, 0 + else: + raise ChildProcessError() + + def add_zombie(self, pid, returncode): + self.zombies[pid] = returncode + 32768 + + def WIFEXITED(status): + return status >= 32768 + + def WIFSIGNALED(status): + return 32700 < status < 32768 + + def WEXITSTATUS(status): + self = ChildWatcherTestsMixin.instance + self.assertTrue(type(self).WIFEXITED(status)) + return status - 32768 + + def WTERMSIG(status): + self = ChildWatcherTestsMixin.instance + self.assertTrue(type(self).WIFSIGNALED(status)) + return 32768 - status + + def test_create_watcher(self): + self.m_add_signal_handler.assert_called_once_with( + signal.SIGCHLD, self.watcher._sig_chld) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, + m_WEXITSTATUS, m_WTERMSIG): + # register a child + callback = unittest.mock.Mock() + + with self.watcher: + self.running = True + self.watcher.add_child_handler(42, callback, 9, 10, 14) + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child is running + self.watcher._sig_chld() + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child terminates (returncode 12) + self.running = False + self.add_zombie(42, 12) + self.watcher._sig_chld() + + self.assertTrue(m_WIFEXITED.called) + self.assertTrue(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + callback.assert_called_once_with(42, 12, 9, 10, 14) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WEXITSTATUS.reset_mock() + callback.reset_mock() + + # ensure that the child is effectively reaped + self.add_zombie(42, 13) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback.called) + self.assertFalse(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WEXITSTATUS.reset_mock() + + # sigchld called again + self.zombies.clear() + self.watcher._sig_chld() + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_two_children(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, + m_WEXITSTATUS, m_WTERMSIG): + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + + # register child 1 + with self.watcher: + self.running = True + self.watcher.add_child_handler(43, callback1, 7, 8) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # register child 2 + with self.watcher: + self.watcher.add_child_handler(44, callback2, 147, 18) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # childen are running + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child 1 terminates (signal 3) + self.add_zombie(43, -3) + self.watcher._sig_chld() + + callback1.assert_called_once_with(43, -3, 7, 8) + self.assertFalse(callback2.called) + self.assertTrue(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertTrue(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WTERMSIG.reset_mock() + callback1.reset_mock() + + # child 2 still running + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child 2 terminates (code 108) + self.add_zombie(44, 108) + self.running = False + self.watcher._sig_chld() + + callback2.assert_called_once_with(44, 108, 147, 18) + self.assertFalse(callback1.called) + self.assertTrue(m_WIFEXITED.called) + self.assertTrue(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WEXITSTATUS.reset_mock() + callback2.reset_mock() + + # ensure that the children are effectively reaped + self.add_zombie(43, 14) + self.add_zombie(44, 15) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WEXITSTATUS.reset_mock() + + # sigchld called again + self.zombies.clear() + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_two_children_terminating_together( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + + # register child 1 + with self.watcher: + self.running = True + self.watcher.add_child_handler(45, callback1, 17, 8) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # register child 2 + with self.watcher: + self.watcher.add_child_handler(46, callback2, 1147, 18) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # childen are running + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child 1 terminates (code 78) + # child 2 terminates (signal 5) + self.add_zombie(45, 78) + self.add_zombie(46, -5) + self.running = False + self.watcher._sig_chld() + + callback1.assert_called_once_with(45, 78, 17, 8) + callback2.assert_called_once_with(46, -5, 1147, 18) + self.assertTrue(m_WIFSIGNALED.called) + self.assertTrue(m_WIFEXITED.called) + self.assertTrue(m_WEXITSTATUS.called) + self.assertTrue(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WTERMSIG.reset_mock() + m_WEXITSTATUS.reset_mock() + callback1.reset_mock() + callback2.reset_mock() + + # ensure that the children are effectively reaped + self.add_zombie(45, 14) + self.add_zombie(46, 15) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WTERMSIG.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_race_condition( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + # register a child + callback = unittest.mock.Mock() + + with self.watcher: + # child terminates before being registered + self.add_zombie(50, 4) + self.watcher._sig_chld() + + self.watcher.add_child_handler(50, callback, 1, 12) + + callback.assert_called_once_with(50, 4, 1, 12) + callback.reset_mock() + + # ensure that the child is effectively reaped + self.add_zombie(50, -1) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_replace_handler( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + + # register a child + with self.watcher: + self.running = True + self.watcher.add_child_handler(51, callback1, 19) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # register the same child again + with self.watcher: + self.watcher.add_child_handler(51, callback2, 21) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child terminates (signal 8) + self.running = False + self.add_zombie(51, -8) + self.watcher._sig_chld() + + callback2.assert_called_once_with(51, -8, 21) + self.assertFalse(callback1.called) + self.assertTrue(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertTrue(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WTERMSIG.reset_mock() + callback2.reset_mock() + + # ensure that the child is effectively reaped + self.add_zombie(51, 13) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WTERMSIG.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_remove_handler(self, m_waitpid, m_WIFEXITED, + m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG): + callback = unittest.mock.Mock() + + # register a child + with self.watcher: + self.running = True + self.watcher.add_child_handler(52, callback, 1984) + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # unregister the child + self.watcher.remove_child_handler(52) + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child terminates (code 99) + self.running = False + self.add_zombie(52, 99) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_unknown_status(self, m_waitpid, m_WIFEXITED, + m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG): + callback = unittest.mock.Mock() + + # register a child + with self.watcher: + self.running = True + self.watcher.add_child_handler(53, callback, -19) + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # terminate with unknown status + self.zombies[53] = 1178 + self.running = False + self.watcher._sig_chld() + + callback.assert_called_once_with(53, 1178, -19) + self.assertTrue(m_WIFEXITED.called) + self.assertTrue(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + callback.reset_mock() + m_WIFEXITED.reset_mock() + m_WIFSIGNALED.reset_mock() + + # ensure that the child is effectively reaped + self.add_zombie(53, 101) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_remove_child_handler(self, m_waitpid, m_WIFEXITED, + m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG): + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + callback3 = unittest.mock.Mock() + + # register children + with self.watcher: + self.running = True + self.watcher.add_child_handler(54, callback1, 1) + self.watcher.add_child_handler(55, callback2, 2) + self.watcher.add_child_handler(56, callback3, 3) + + # remove child handler 1 + self.assertTrue(self.watcher.remove_child_handler(54)) + + # remove child handler 2 multiple times + self.assertTrue(self.watcher.remove_child_handler(55)) + self.assertFalse(self.watcher.remove_child_handler(55)) + self.assertFalse(self.watcher.remove_child_handler(55)) + + # all children terminate + self.add_zombie(54, 0) + self.add_zombie(55, 1) + self.add_zombie(56, 2) + self.running = False + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + callback3.assert_called_once_with(56, 2, 3) + + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_unhandled_exception(self, m_waitpid): + callback = unittest.mock.Mock() + + # register a child + with self.watcher: + self.running = True + self.watcher.add_child_handler(57, callback) + + # raise an exception + m_waitpid.side_effect = ValueError + + with unittest.mock.patch.object(unix_events.logger, + "exception") as m_exception: + + self.assertEqual(self.watcher._sig_chld(), None) + self.assertTrue(m_exception.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_child_reaped_elsewhere( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + + # register a child + callback = unittest.mock.Mock() + + with self.watcher: + self.running = True + self.watcher.add_child_handler(58, callback) + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child terminates + self.running = False + self.add_zombie(58, 4) + + # waitpid is called elsewhere + os.waitpid(58, os.WNOHANG) + + m_waitpid.reset_mock() + + # sigchld + with self.ignore_warnings: + self.watcher._sig_chld() + + callback.assert_called(m_waitpid) + if isinstance(self.watcher, unix_events.FastChildWatcher): + # here the FastChildWatche enters a deadlock + # (there is no way to prevent it) + self.assertFalse(callback.called) + else: + callback.assert_called_once_with(58, 255) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_unknown_pid_during_registration( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + + # register two children + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + + with self.ignore_warnings, self.watcher: + self.running = True + # child 1 terminates + self.add_zombie(591, 7) + # an unknown child terminates + self.add_zombie(593, 17) + + self.watcher._sig_chld() + + self.watcher.add_child_handler(591, callback1) + self.watcher.add_child_handler(592, callback2) + + callback1.assert_called_once_with(591, 7) + self.assertFalse(callback2.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_set_loop( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + + # register a child + callback = unittest.mock.Mock() + + with self.watcher: + self.running = True + self.watcher.add_child_handler(60, callback) + + # attach a new loop + old_loop = self.loop + self.loop = test_utils.TestLoop() + + with unittest.mock.patch.object( + old_loop, + "remove_signal_handler") as m_old_remove_signal_handler, \ + unittest.mock.patch.object( + self.loop, + "add_signal_handler") as m_new_add_signal_handler: + + self.watcher.set_loop(self.loop) + + m_old_remove_signal_handler.assert_called_once_with( + signal.SIGCHLD) + m_new_add_signal_handler.assert_called_once_with( + signal.SIGCHLD, self.watcher._sig_chld) + + # child terminates + self.running = False + self.add_zombie(60, 9) + self.watcher._sig_chld() + + callback.assert_called_once_with(60, 9) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_set_loop_race_condition( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + + # register 3 children + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + callback3 = unittest.mock.Mock() + + with self.watcher: + self.running = True + self.watcher.add_child_handler(61, callback1) + self.watcher.add_child_handler(62, callback2) + self.watcher.add_child_handler(622, callback3) + + # detach the loop + old_loop = self.loop + self.loop = None + + with unittest.mock.patch.object( + old_loop, "remove_signal_handler") as m_remove_signal_handler: + + self.watcher.set_loop(None) + + m_remove_signal_handler.assert_called_once_with( + signal.SIGCHLD) + + # child 1 & 2 terminate + self.add_zombie(61, 11) + self.add_zombie(62, -5) + + # SIGCHLD was not catched + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(callback3.called) + + # attach a new loop + self.loop = test_utils.TestLoop() + + with unittest.mock.patch.object( + self.loop, "add_signal_handler") as m_add_signal_handler: + + self.watcher.set_loop(self.loop) + + m_add_signal_handler.assert_called_once_with( + signal.SIGCHLD, self.watcher._sig_chld) + callback1.assert_called_once_with(61, 11) # race condition! + callback2.assert_called_once_with(62, -5) # race condition! + self.assertFalse(callback3.called) + + callback1.reset_mock() + callback2.reset_mock() + + # child 3 terminates + self.running = False + self.add_zombie(622, 19) + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + callback3.assert_called_once_with(622, 19) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_close( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + + # register two children + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + + with self.watcher: + self.running = True + # child 1 terminates + self.add_zombie(63, 9) + # other child terminates + self.add_zombie(65, 18) + self.watcher._sig_chld() + + self.watcher.add_child_handler(63, callback1) + self.watcher.add_child_handler(64, callback1) + + self.assertEqual(len(self.watcher._callbacks), 1) + if isinstance(self.watcher, unix_events.FastChildWatcher): + self.assertEqual(len(self.watcher._zombies), 1) + + with unittest.mock.patch.object( + self.loop, + "remove_signal_handler") as m_remove_signal_handler: + + self.watcher.close() + + m_remove_signal_handler.assert_called_once_with( + signal.SIGCHLD) + self.assertFalse(self.watcher._callbacks) + if isinstance(self.watcher, unix_events.FastChildWatcher): + self.assertFalse(self.watcher._zombies) + + +class SafeChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase): + def create_watcher(self, loop): + return unix_events.SafeChildWatcher(loop) + + +class FastChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase): + def create_watcher(self, loop): + return unix_events.FastChildWatcher(loop) + + +class PolicyTests(unittest.TestCase): + + def create_policy(self): + return unix_events.DefaultEventLoopPolicy() + + def test_get_child_watcher(self): + policy = self.create_policy() + self.assertIsNone(policy._watcher) + + watcher = policy.get_child_watcher() + self.assertIsInstance(watcher, unix_events.SafeChildWatcher) + + self.assertIs(policy._watcher, watcher) + + self.assertIs(watcher, policy.get_child_watcher()) + self.assertIsNone(watcher._loop) + + def test_get_child_watcher_after_set(self): + policy = self.create_policy() + watcher = unix_events.FastChildWatcher(None) + + policy.set_child_watcher(watcher) + self.assertIs(policy._watcher, watcher) + self.assertIs(watcher, policy.get_child_watcher()) + + def test_get_child_watcher_with_mainloop_existing(self): + policy = self.create_policy() + loop = policy.get_event_loop() + + self.assertIsNone(policy._watcher) + watcher = policy.get_child_watcher() + + self.assertIsInstance(watcher, unix_events.SafeChildWatcher) + self.assertIs(watcher._loop, loop) + + loop.close() + + def test_get_child_watcher_thread(self): + + def f(): + policy.set_event_loop(policy.new_event_loop()) + + self.assertIsInstance(policy.get_event_loop(), + events.AbstractEventLoop) + watcher = policy.get_child_watcher() + + self.assertIsInstance(watcher, unix_events.SafeChildWatcher) + self.assertIsNone(watcher._loop) + + policy.get_event_loop().close() + + policy = self.create_policy() + + th = threading.Thread(target=f) + th.start() + th.join() + + def test_child_watcher_replace_mainloop_existing(self): + policy = self.create_policy() + loop = policy.get_event_loop() + + watcher = policy.get_child_watcher() + + self.assertIs(watcher._loop, loop) + + new_loop = policy.new_event_loop() + policy.set_event_loop(new_loop) + + self.assertIs(watcher._loop, new_loop) + + policy.set_event_loop(None) + + self.assertIs(watcher._loop, None) + + loop.close() + new_loop.close() + + if __name__ == '__main__': unittest.main() |