summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_unix_events.py
diff options
context:
space:
mode:
authorGuido van Rossum <guido@dropbox.com>2013-11-04 23:50:46 (GMT)
committerGuido van Rossum <guido@dropbox.com>2013-11-04 23:50:46 (GMT)
commit0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963 (patch)
tree4d8d97d49e5a5aa9aa03df73450545ab4d780943 /Lib/test/test_asyncio/test_unix_events.py
parentccea08462b753fc78ec97cc5717de8f163b503ec (diff)
downloadcpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.zip
cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.tar.gz
cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.tar.bz2
asyncio: Refactor SIGCHLD handling. By Anthony Baire.
Diffstat (limited to 'Lib/test/test_asyncio/test_unix_events.py')
-rw-r--r--Lib/test/test_asyncio/test_unix_events.py987
1 files changed, 869 insertions, 118 deletions
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
index f29e7af..a4d835e 100644
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -3,10 +3,12 @@
import gc
import errno
import io
+import os
import pprint
import signal
import stat
import sys
+import threading
import unittest
import unittest.mock
@@ -181,124 +183,6 @@ class SelectorEventLoopTests(unittest.TestCase):
self.assertRaises(
RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
- @unittest.mock.patch('os.WTERMSIG')
- @unittest.mock.patch('os.WEXITSTATUS')
- @unittest.mock.patch('os.WIFSIGNALED')
- @unittest.mock.patch('os.WIFEXITED')
- @unittest.mock.patch('os.waitpid')
- def test__sig_chld(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED,
- m_WEXITSTATUS, m_WTERMSIG):
- m_waitpid.side_effect = [(7, object()), ChildProcessError]
- m_WIFEXITED.return_value = True
- m_WIFSIGNALED.return_value = False
- m_WEXITSTATUS.return_value = 3
- transp = unittest.mock.Mock()
- self.loop._subprocesses[7] = transp
-
- self.loop._sig_chld()
- transp._process_exited.assert_called_with(3)
- self.assertFalse(m_WTERMSIG.called)
-
- @unittest.mock.patch('os.WTERMSIG')
- @unittest.mock.patch('os.WEXITSTATUS')
- @unittest.mock.patch('os.WIFSIGNALED')
- @unittest.mock.patch('os.WIFEXITED')
- @unittest.mock.patch('os.waitpid')
- def test__sig_chld_signal(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED,
- m_WEXITSTATUS, m_WTERMSIG):
- m_waitpid.side_effect = [(7, object()), ChildProcessError]
- m_WIFEXITED.return_value = False
- m_WIFSIGNALED.return_value = True
- m_WTERMSIG.return_value = 1
- transp = unittest.mock.Mock()
- self.loop._subprocesses[7] = transp
-
- self.loop._sig_chld()
- transp._process_exited.assert_called_with(-1)
- self.assertFalse(m_WEXITSTATUS.called)
-
- @unittest.mock.patch('os.WTERMSIG')
- @unittest.mock.patch('os.WEXITSTATUS')
- @unittest.mock.patch('os.WIFSIGNALED')
- @unittest.mock.patch('os.WIFEXITED')
- @unittest.mock.patch('os.waitpid')
- def test__sig_chld_zero_pid(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED,
- m_WEXITSTATUS, m_WTERMSIG):
- m_waitpid.side_effect = [(0, object()), ChildProcessError]
- transp = unittest.mock.Mock()
- self.loop._subprocesses[7] = transp
-
- self.loop._sig_chld()
- self.assertFalse(transp._process_exited.called)
- self.assertFalse(m_WIFSIGNALED.called)
- self.assertFalse(m_WIFEXITED.called)
- self.assertFalse(m_WTERMSIG.called)
- self.assertFalse(m_WEXITSTATUS.called)
-
- @unittest.mock.patch('os.WTERMSIG')
- @unittest.mock.patch('os.WEXITSTATUS')
- @unittest.mock.patch('os.WIFSIGNALED')
- @unittest.mock.patch('os.WIFEXITED')
- @unittest.mock.patch('os.waitpid')
- def test__sig_chld_not_registered_subprocess(self, m_waitpid,
- m_WIFEXITED, m_WIFSIGNALED,
- m_WEXITSTATUS, m_WTERMSIG):
- m_waitpid.side_effect = [(7, object()), ChildProcessError]
- m_WIFEXITED.return_value = True
- m_WIFSIGNALED.return_value = False
- m_WEXITSTATUS.return_value = 3
-
- self.loop._sig_chld()
- self.assertFalse(m_WTERMSIG.called)
-
- @unittest.mock.patch('os.WTERMSIG')
- @unittest.mock.patch('os.WEXITSTATUS')
- @unittest.mock.patch('os.WIFSIGNALED')
- @unittest.mock.patch('os.WIFEXITED')
- @unittest.mock.patch('os.waitpid')
- def test__sig_chld_unknown_status(self, m_waitpid,
- m_WIFEXITED, m_WIFSIGNALED,
- m_WEXITSTATUS, m_WTERMSIG):
- m_waitpid.side_effect = [(7, object()), ChildProcessError]
- m_WIFEXITED.return_value = False
- m_WIFSIGNALED.return_value = False
- transp = unittest.mock.Mock()
- self.loop._subprocesses[7] = transp
-
- self.loop._sig_chld()
- self.assertTrue(transp._process_exited.called)
- self.assertFalse(m_WEXITSTATUS.called)
- self.assertFalse(m_WTERMSIG.called)
-
- @unittest.mock.patch('asyncio.unix_events.logger')
- @unittest.mock.patch('os.WTERMSIG')
- @unittest.mock.patch('os.WEXITSTATUS')
- @unittest.mock.patch('os.WIFSIGNALED')
- @unittest.mock.patch('os.WIFEXITED')
- @unittest.mock.patch('os.waitpid')
- def test__sig_chld_unknown_status_in_handler(self, m_waitpid,
- m_WIFEXITED, m_WIFSIGNALED,
- m_WEXITSTATUS, m_WTERMSIG,
- m_log):
- m_waitpid.side_effect = Exception
- transp = unittest.mock.Mock()
- self.loop._subprocesses[7] = transp
-
- self.loop._sig_chld()
- self.assertFalse(transp._process_exited.called)
- self.assertFalse(m_WIFSIGNALED.called)
- self.assertFalse(m_WIFEXITED.called)
- self.assertFalse(m_WTERMSIG.called)
- self.assertFalse(m_WEXITSTATUS.called)
- m_log.exception.assert_called_with(
- 'Unknown exception in SIGCHLD handler')
-
- @unittest.mock.patch('os.waitpid')
- def test__sig_chld_process_error(self, m_waitpid):
- m_waitpid.side_effect = ChildProcessError
- self.loop._sig_chld()
- self.assertTrue(m_waitpid.called)
-
class UnixReadPipeTransportTests(unittest.TestCase):
@@ -777,5 +661,872 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.assertFalse(self.protocol.connection_lost.called)
+class AbstractChildWatcherTests(unittest.TestCase):
+
+ def test_not_implemented(self):
+ f = unittest.mock.Mock()
+ watcher = unix_events.AbstractChildWatcher()
+ self.assertRaises(
+ NotImplementedError, watcher.add_child_handler, f, f)
+ self.assertRaises(
+ NotImplementedError, watcher.remove_child_handler, f)
+ self.assertRaises(
+ NotImplementedError, watcher.set_loop, f)
+ self.assertRaises(
+ NotImplementedError, watcher.close)
+ self.assertRaises(
+ NotImplementedError, watcher.__enter__)
+ self.assertRaises(
+ NotImplementedError, watcher.__exit__, f, f, f)
+
+
+class BaseChildWatcherTests(unittest.TestCase):
+
+ def test_not_implemented(self):
+ f = unittest.mock.Mock()
+ watcher = unix_events.BaseChildWatcher(None)
+ self.assertRaises(
+ NotImplementedError, watcher._do_waitpid, f)
+
+
+class ChildWatcherTestsMixin:
+ instance = None
+
+ ignore_warnings = unittest.mock.patch.object(unix_events.logger, "warning")
+
+ def setUp(self):
+ self.loop = test_utils.TestLoop()
+ self.running = False
+ self.zombies = {}
+
+ assert ChildWatcherTestsMixin.instance is None
+ ChildWatcherTestsMixin.instance = self
+
+ with unittest.mock.patch.object(
+ self.loop, "add_signal_handler") as self.m_add_signal_handler:
+ self.watcher = self.create_watcher(self.loop)
+
+ def tearDown(self):
+ ChildWatcherTestsMixin.instance = None
+
+ def waitpid(pid, flags):
+ self = ChildWatcherTestsMixin.instance
+ if isinstance(self.watcher, unix_events.SafeChildWatcher) or pid != -1:
+ self.assertGreater(pid, 0)
+ try:
+ if pid < 0:
+ return self.zombies.popitem()
+ else:
+ return pid, self.zombies.pop(pid)
+ except KeyError:
+ pass
+ if self.running:
+ return 0, 0
+ else:
+ raise ChildProcessError()
+
+ def add_zombie(self, pid, returncode):
+ self.zombies[pid] = returncode + 32768
+
+ def WIFEXITED(status):
+ return status >= 32768
+
+ def WIFSIGNALED(status):
+ return 32700 < status < 32768
+
+ def WEXITSTATUS(status):
+ self = ChildWatcherTestsMixin.instance
+ self.assertTrue(type(self).WIFEXITED(status))
+ return status - 32768
+
+ def WTERMSIG(status):
+ self = ChildWatcherTestsMixin.instance
+ self.assertTrue(type(self).WIFSIGNALED(status))
+ return 32768 - status
+
+ def test_create_watcher(self):
+ self.m_add_signal_handler.assert_called_once_with(
+ signal.SIGCHLD, self.watcher._sig_chld)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_sigchld(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED,
+ m_WEXITSTATUS, m_WTERMSIG):
+ # register a child
+ callback = unittest.mock.Mock()
+
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(42, callback, 9, 10, 14)
+
+ self.assertFalse(callback.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # child is running
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # child terminates (returncode 12)
+ self.running = False
+ self.add_zombie(42, 12)
+ self.watcher._sig_chld()
+
+ self.assertTrue(m_WIFEXITED.called)
+ self.assertTrue(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+ callback.assert_called_once_with(42, 12, 9, 10, 14)
+
+ m_WIFSIGNALED.reset_mock()
+ m_WIFEXITED.reset_mock()
+ m_WEXITSTATUS.reset_mock()
+ callback.reset_mock()
+
+ # ensure that the child is effectively reaped
+ self.add_zombie(42, 13)
+ with self.ignore_warnings:
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ m_WIFSIGNALED.reset_mock()
+ m_WIFEXITED.reset_mock()
+ m_WEXITSTATUS.reset_mock()
+
+ # sigchld called again
+ self.zombies.clear()
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_sigchld_two_children(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED,
+ m_WEXITSTATUS, m_WTERMSIG):
+ callback1 = unittest.mock.Mock()
+ callback2 = unittest.mock.Mock()
+
+ # register child 1
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(43, callback1, 7, 8)
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # register child 2
+ with self.watcher:
+ self.watcher.add_child_handler(44, callback2, 147, 18)
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # childen are running
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # child 1 terminates (signal 3)
+ self.add_zombie(43, -3)
+ self.watcher._sig_chld()
+
+ callback1.assert_called_once_with(43, -3, 7, 8)
+ self.assertFalse(callback2.called)
+ self.assertTrue(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertTrue(m_WTERMSIG.called)
+
+ m_WIFSIGNALED.reset_mock()
+ m_WIFEXITED.reset_mock()
+ m_WTERMSIG.reset_mock()
+ callback1.reset_mock()
+
+ # child 2 still running
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # child 2 terminates (code 108)
+ self.add_zombie(44, 108)
+ self.running = False
+ self.watcher._sig_chld()
+
+ callback2.assert_called_once_with(44, 108, 147, 18)
+ self.assertFalse(callback1.called)
+ self.assertTrue(m_WIFEXITED.called)
+ self.assertTrue(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ m_WIFSIGNALED.reset_mock()
+ m_WIFEXITED.reset_mock()
+ m_WEXITSTATUS.reset_mock()
+ callback2.reset_mock()
+
+ # ensure that the children are effectively reaped
+ self.add_zombie(43, 14)
+ self.add_zombie(44, 15)
+ with self.ignore_warnings:
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ m_WIFSIGNALED.reset_mock()
+ m_WIFEXITED.reset_mock()
+ m_WEXITSTATUS.reset_mock()
+
+ # sigchld called again
+ self.zombies.clear()
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_sigchld_two_children_terminating_together(
+ self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+ m_WTERMSIG):
+ callback1 = unittest.mock.Mock()
+ callback2 = unittest.mock.Mock()
+
+ # register child 1
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(45, callback1, 17, 8)
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # register child 2
+ with self.watcher:
+ self.watcher.add_child_handler(46, callback2, 1147, 18)
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # childen are running
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # child 1 terminates (code 78)
+ # child 2 terminates (signal 5)
+ self.add_zombie(45, 78)
+ self.add_zombie(46, -5)
+ self.running = False
+ self.watcher._sig_chld()
+
+ callback1.assert_called_once_with(45, 78, 17, 8)
+ callback2.assert_called_once_with(46, -5, 1147, 18)
+ self.assertTrue(m_WIFSIGNALED.called)
+ self.assertTrue(m_WIFEXITED.called)
+ self.assertTrue(m_WEXITSTATUS.called)
+ self.assertTrue(m_WTERMSIG.called)
+
+ m_WIFSIGNALED.reset_mock()
+ m_WIFEXITED.reset_mock()
+ m_WTERMSIG.reset_mock()
+ m_WEXITSTATUS.reset_mock()
+ callback1.reset_mock()
+ callback2.reset_mock()
+
+ # ensure that the children are effectively reaped
+ self.add_zombie(45, 14)
+ self.add_zombie(46, 15)
+ with self.ignore_warnings:
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_sigchld_race_condition(
+ self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+ m_WTERMSIG):
+ # register a child
+ callback = unittest.mock.Mock()
+
+ with self.watcher:
+ # child terminates before being registered
+ self.add_zombie(50, 4)
+ self.watcher._sig_chld()
+
+ self.watcher.add_child_handler(50, callback, 1, 12)
+
+ callback.assert_called_once_with(50, 4, 1, 12)
+ callback.reset_mock()
+
+ # ensure that the child is effectively reaped
+ self.add_zombie(50, -1)
+ with self.ignore_warnings:
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback.called)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_sigchld_replace_handler(
+ self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+ m_WTERMSIG):
+ callback1 = unittest.mock.Mock()
+ callback2 = unittest.mock.Mock()
+
+ # register a child
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(51, callback1, 19)
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # register the same child again
+ with self.watcher:
+ self.watcher.add_child_handler(51, callback2, 21)
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # child terminates (signal 8)
+ self.running = False
+ self.add_zombie(51, -8)
+ self.watcher._sig_chld()
+
+ callback2.assert_called_once_with(51, -8, 21)
+ self.assertFalse(callback1.called)
+ self.assertTrue(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertTrue(m_WTERMSIG.called)
+
+ m_WIFSIGNALED.reset_mock()
+ m_WIFEXITED.reset_mock()
+ m_WTERMSIG.reset_mock()
+ callback2.reset_mock()
+
+ # ensure that the child is effectively reaped
+ self.add_zombie(51, 13)
+ with self.ignore_warnings:
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_sigchld_remove_handler(self, m_waitpid, m_WIFEXITED,
+ m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG):
+ callback = unittest.mock.Mock()
+
+ # register a child
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(52, callback, 1984)
+
+ self.assertFalse(callback.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # unregister the child
+ self.watcher.remove_child_handler(52)
+
+ self.assertFalse(callback.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # child terminates (code 99)
+ self.running = False
+ self.add_zombie(52, 99)
+ with self.ignore_warnings:
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback.called)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_sigchld_unknown_status(self, m_waitpid, m_WIFEXITED,
+ m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG):
+ callback = unittest.mock.Mock()
+
+ # register a child
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(53, callback, -19)
+
+ self.assertFalse(callback.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # terminate with unknown status
+ self.zombies[53] = 1178
+ self.running = False
+ self.watcher._sig_chld()
+
+ callback.assert_called_once_with(53, 1178, -19)
+ self.assertTrue(m_WIFEXITED.called)
+ self.assertTrue(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ callback.reset_mock()
+ m_WIFEXITED.reset_mock()
+ m_WIFSIGNALED.reset_mock()
+
+ # ensure that the child is effectively reaped
+ self.add_zombie(53, 101)
+ with self.ignore_warnings:
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback.called)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_remove_child_handler(self, m_waitpid, m_WIFEXITED,
+ m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG):
+ callback1 = unittest.mock.Mock()
+ callback2 = unittest.mock.Mock()
+ callback3 = unittest.mock.Mock()
+
+ # register children
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(54, callback1, 1)
+ self.watcher.add_child_handler(55, callback2, 2)
+ self.watcher.add_child_handler(56, callback3, 3)
+
+ # remove child handler 1
+ self.assertTrue(self.watcher.remove_child_handler(54))
+
+ # remove child handler 2 multiple times
+ self.assertTrue(self.watcher.remove_child_handler(55))
+ self.assertFalse(self.watcher.remove_child_handler(55))
+ self.assertFalse(self.watcher.remove_child_handler(55))
+
+ # all children terminate
+ self.add_zombie(54, 0)
+ self.add_zombie(55, 1)
+ self.add_zombie(56, 2)
+ self.running = False
+ with self.ignore_warnings:
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ callback3.assert_called_once_with(56, 2, 3)
+
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_sigchld_unhandled_exception(self, m_waitpid):
+ callback = unittest.mock.Mock()
+
+ # register a child
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(57, callback)
+
+ # raise an exception
+ m_waitpid.side_effect = ValueError
+
+ with unittest.mock.patch.object(unix_events.logger,
+ "exception") as m_exception:
+
+ self.assertEqual(self.watcher._sig_chld(), None)
+ self.assertTrue(m_exception.called)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_sigchld_child_reaped_elsewhere(
+ self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+ m_WTERMSIG):
+
+ # register a child
+ callback = unittest.mock.Mock()
+
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(58, callback)
+
+ self.assertFalse(callback.called)
+ self.assertFalse(m_WIFEXITED.called)
+ self.assertFalse(m_WIFSIGNALED.called)
+ self.assertFalse(m_WEXITSTATUS.called)
+ self.assertFalse(m_WTERMSIG.called)
+
+ # child terminates
+ self.running = False
+ self.add_zombie(58, 4)
+
+ # waitpid is called elsewhere
+ os.waitpid(58, os.WNOHANG)
+
+ m_waitpid.reset_mock()
+
+ # sigchld
+ with self.ignore_warnings:
+ self.watcher._sig_chld()
+
+ callback.assert_called(m_waitpid)
+ if isinstance(self.watcher, unix_events.FastChildWatcher):
+ # here the FastChildWatche enters a deadlock
+ # (there is no way to prevent it)
+ self.assertFalse(callback.called)
+ else:
+ callback.assert_called_once_with(58, 255)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_sigchld_unknown_pid_during_registration(
+ self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+ m_WTERMSIG):
+
+ # register two children
+ callback1 = unittest.mock.Mock()
+ callback2 = unittest.mock.Mock()
+
+ with self.ignore_warnings, self.watcher:
+ self.running = True
+ # child 1 terminates
+ self.add_zombie(591, 7)
+ # an unknown child terminates
+ self.add_zombie(593, 17)
+
+ self.watcher._sig_chld()
+
+ self.watcher.add_child_handler(591, callback1)
+ self.watcher.add_child_handler(592, callback2)
+
+ callback1.assert_called_once_with(591, 7)
+ self.assertFalse(callback2.called)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_set_loop(
+ self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+ m_WTERMSIG):
+
+ # register a child
+ callback = unittest.mock.Mock()
+
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(60, callback)
+
+ # attach a new loop
+ old_loop = self.loop
+ self.loop = test_utils.TestLoop()
+
+ with unittest.mock.patch.object(
+ old_loop,
+ "remove_signal_handler") as m_old_remove_signal_handler, \
+ unittest.mock.patch.object(
+ self.loop,
+ "add_signal_handler") as m_new_add_signal_handler:
+
+ self.watcher.set_loop(self.loop)
+
+ m_old_remove_signal_handler.assert_called_once_with(
+ signal.SIGCHLD)
+ m_new_add_signal_handler.assert_called_once_with(
+ signal.SIGCHLD, self.watcher._sig_chld)
+
+ # child terminates
+ self.running = False
+ self.add_zombie(60, 9)
+ self.watcher._sig_chld()
+
+ callback.assert_called_once_with(60, 9)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_set_loop_race_condition(
+ self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+ m_WTERMSIG):
+
+ # register 3 children
+ callback1 = unittest.mock.Mock()
+ callback2 = unittest.mock.Mock()
+ callback3 = unittest.mock.Mock()
+
+ with self.watcher:
+ self.running = True
+ self.watcher.add_child_handler(61, callback1)
+ self.watcher.add_child_handler(62, callback2)
+ self.watcher.add_child_handler(622, callback3)
+
+ # detach the loop
+ old_loop = self.loop
+ self.loop = None
+
+ with unittest.mock.patch.object(
+ old_loop, "remove_signal_handler") as m_remove_signal_handler:
+
+ self.watcher.set_loop(None)
+
+ m_remove_signal_handler.assert_called_once_with(
+ signal.SIGCHLD)
+
+ # child 1 & 2 terminate
+ self.add_zombie(61, 11)
+ self.add_zombie(62, -5)
+
+ # SIGCHLD was not catched
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ self.assertFalse(callback3.called)
+
+ # attach a new loop
+ self.loop = test_utils.TestLoop()
+
+ with unittest.mock.patch.object(
+ self.loop, "add_signal_handler") as m_add_signal_handler:
+
+ self.watcher.set_loop(self.loop)
+
+ m_add_signal_handler.assert_called_once_with(
+ signal.SIGCHLD, self.watcher._sig_chld)
+ callback1.assert_called_once_with(61, 11) # race condition!
+ callback2.assert_called_once_with(62, -5) # race condition!
+ self.assertFalse(callback3.called)
+
+ callback1.reset_mock()
+ callback2.reset_mock()
+
+ # child 3 terminates
+ self.running = False
+ self.add_zombie(622, 19)
+ self.watcher._sig_chld()
+
+ self.assertFalse(callback1.called)
+ self.assertFalse(callback2.called)
+ callback3.assert_called_once_with(622, 19)
+
+ @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+ @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+ @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+ @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+ @unittest.mock.patch('os.waitpid', wraps=waitpid)
+ def test_close(
+ self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+ m_WTERMSIG):
+
+ # register two children
+ callback1 = unittest.mock.Mock()
+ callback2 = unittest.mock.Mock()
+
+ with self.watcher:
+ self.running = True
+ # child 1 terminates
+ self.add_zombie(63, 9)
+ # other child terminates
+ self.add_zombie(65, 18)
+ self.watcher._sig_chld()
+
+ self.watcher.add_child_handler(63, callback1)
+ self.watcher.add_child_handler(64, callback1)
+
+ self.assertEqual(len(self.watcher._callbacks), 1)
+ if isinstance(self.watcher, unix_events.FastChildWatcher):
+ self.assertEqual(len(self.watcher._zombies), 1)
+
+ with unittest.mock.patch.object(
+ self.loop,
+ "remove_signal_handler") as m_remove_signal_handler:
+
+ self.watcher.close()
+
+ m_remove_signal_handler.assert_called_once_with(
+ signal.SIGCHLD)
+ self.assertFalse(self.watcher._callbacks)
+ if isinstance(self.watcher, unix_events.FastChildWatcher):
+ self.assertFalse(self.watcher._zombies)
+
+
+class SafeChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase):
+ def create_watcher(self, loop):
+ return unix_events.SafeChildWatcher(loop)
+
+
+class FastChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase):
+ def create_watcher(self, loop):
+ return unix_events.FastChildWatcher(loop)
+
+
+class PolicyTests(unittest.TestCase):
+
+ def create_policy(self):
+ return unix_events.DefaultEventLoopPolicy()
+
+ def test_get_child_watcher(self):
+ policy = self.create_policy()
+ self.assertIsNone(policy._watcher)
+
+ watcher = policy.get_child_watcher()
+ self.assertIsInstance(watcher, unix_events.SafeChildWatcher)
+
+ self.assertIs(policy._watcher, watcher)
+
+ self.assertIs(watcher, policy.get_child_watcher())
+ self.assertIsNone(watcher._loop)
+
+ def test_get_child_watcher_after_set(self):
+ policy = self.create_policy()
+ watcher = unix_events.FastChildWatcher(None)
+
+ policy.set_child_watcher(watcher)
+ self.assertIs(policy._watcher, watcher)
+ self.assertIs(watcher, policy.get_child_watcher())
+
+ def test_get_child_watcher_with_mainloop_existing(self):
+ policy = self.create_policy()
+ loop = policy.get_event_loop()
+
+ self.assertIsNone(policy._watcher)
+ watcher = policy.get_child_watcher()
+
+ self.assertIsInstance(watcher, unix_events.SafeChildWatcher)
+ self.assertIs(watcher._loop, loop)
+
+ loop.close()
+
+ def test_get_child_watcher_thread(self):
+
+ def f():
+ policy.set_event_loop(policy.new_event_loop())
+
+ self.assertIsInstance(policy.get_event_loop(),
+ events.AbstractEventLoop)
+ watcher = policy.get_child_watcher()
+
+ self.assertIsInstance(watcher, unix_events.SafeChildWatcher)
+ self.assertIsNone(watcher._loop)
+
+ policy.get_event_loop().close()
+
+ policy = self.create_policy()
+
+ th = threading.Thread(target=f)
+ th.start()
+ th.join()
+
+ def test_child_watcher_replace_mainloop_existing(self):
+ policy = self.create_policy()
+ loop = policy.get_event_loop()
+
+ watcher = policy.get_child_watcher()
+
+ self.assertIs(watcher._loop, loop)
+
+ new_loop = policy.new_event_loop()
+ policy.set_event_loop(new_loop)
+
+ self.assertIs(watcher._loop, new_loop)
+
+ policy.set_event_loop(None)
+
+ self.assertIs(watcher._loop, None)
+
+ loop.close()
+ new_loop.close()
+
+
if __name__ == '__main__':
unittest.main()