diff options
Diffstat (limited to 'Lib/test/test_signal.py')
| -rw-r--r-- | Lib/test/test_signal.py | 107 | 
1 files changed, 107 insertions, 0 deletions
| diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index caca0be..04cd342 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -6,6 +6,7 @@ import gc  import pickle  import select  import signal +import socket  import struct  import subprocess  import traceback @@ -255,6 +256,13 @@ class WakeupFDTests(unittest.TestCase):          self.assertRaises((ValueError, OSError),                            signal.set_wakeup_fd, fd) +    def test_invalid_socket(self): +        sock = socket.socket() +        fd = sock.fileno() +        sock.close() +        self.assertRaises((ValueError, OSError), +                          signal.set_wakeup_fd, fd) +      def test_set_wakeup_fd_result(self):          r1, w1 = os.pipe()          self.addCleanup(os.close, r1) @@ -268,6 +276,20 @@ class WakeupFDTests(unittest.TestCase):          self.assertEqual(signal.set_wakeup_fd(-1), w2)          self.assertEqual(signal.set_wakeup_fd(-1), -1) +    def test_set_wakeup_fd_socket_result(self): +        sock1 = socket.socket() +        self.addCleanup(sock1.close) +        fd1 = sock1.fileno() + +        sock2 = socket.socket() +        self.addCleanup(sock2.close) +        fd2 = sock2.fileno() + +        signal.set_wakeup_fd(fd1) +        self.assertIs(signal.set_wakeup_fd(fd2), fd1) +        self.assertIs(signal.set_wakeup_fd(-1), fd2) +        self.assertIs(signal.set_wakeup_fd(-1), -1) +  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")  class WakeupSignalTests(unittest.TestCase): @@ -435,6 +457,90 @@ class WakeupSignalTests(unittest.TestCase):          """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False) +@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') +class WakeupSocketSignalTests(unittest.TestCase): + +    @unittest.skipIf(_testcapi is None, 'need _testcapi') +    def test_socket(self): +        # use a subprocess to have only one thread +        code = """if 1: +        import signal +        import socket +        import struct +        import _testcapi + +        signum = signal.SIGINT +        signals = (signum,) + +        def handler(signum, frame): +            pass + +        signal.signal(signum, handler) + +        read, write = socket.socketpair() +        read.setblocking(False) +        write.setblocking(False) +        signal.set_wakeup_fd(write.fileno()) + +        _testcapi.raise_signal(signum) + +        data = read.recv(1) +        if not data: +            raise Exception("no signum written") +        raised = struct.unpack('B', data) +        if raised != signals: +            raise Exception("%r != %r" % (raised, signals)) + +        read.close() +        write.close() +        """ + +        assert_python_ok('-c', code) + +    @unittest.skipIf(_testcapi is None, 'need _testcapi') +    def test_send_error(self): +        # Use a subprocess to have only one thread. +        if os.name == 'nt': +            action = 'send' +        else: +            action = 'write' +        code = """if 1: +        import errno +        import signal +        import socket +        import sys +        import time +        import _testcapi +        from test.support import captured_stderr + +        signum = signal.SIGINT + +        def handler(signum, frame): +            pass + +        signal.signal(signum, handler) + +        read, write = socket.socketpair() +        read.setblocking(False) +        write.setblocking(False) + +        signal.set_wakeup_fd(write.fileno()) + +        # Close sockets: send() will fail +        read.close() +        write.close() + +        with captured_stderr() as err: +            _testcapi.raise_signal(signum) + +        err = err.getvalue() +        if ('Exception ignored when trying to {action} to the signal wakeup fd' +            not in err): +            raise AssertionError(err) +        """.format(action=action) +        assert_python_ok('-c', code) + +  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")  class SiginterruptTest(unittest.TestCase): @@ -984,6 +1090,7 @@ def test_main():      try:          support.run_unittest(GenericTests, PosixTests, InterProcessSignalTests,                               WakeupFDTests, WakeupSignalTests, +                             WakeupSocketSignalTests,                               SiginterruptTest, ItimerTest, WindowsSignalTests,                               PendingSignalsTests)      finally: | 
