diff options
Diffstat (limited to 'Lib/test/test_io.py')
| -rw-r--r-- | Lib/test/test_io.py | 38 | 
1 files changed, 38 insertions, 0 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index ad2919b..042879d 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -2561,12 +2561,50 @@ class SignalsTest(unittest.TestCase):      def test_interrupted_write_text(self):          self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") +    def check_reentrant_write(self, data, **fdopen_kwargs): +        def on_alarm(*args): +            # Will be called reentrantly from the same thread +            wio.write(data) +            1/0 +        signal.signal(signal.SIGALRM, on_alarm) +        r, w = os.pipe() +        wio = self.io.open(w, **fdopen_kwargs) +        try: +            signal.alarm(1) +            # Either the reentrant call to wio.write() fails with RuntimeError, +            # or the signal handler raises ZeroDivisionError. +            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: +                while 1: +                    for i in range(100): +                        wio.write(data) +                        wio.flush() +                    # Make sure the buffer doesn't fill up and block further writes +                    os.read(r, len(data) * 100) +            exc = cm.exception +            if isinstance(exc, RuntimeError): +                self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) +        finally: +            wio.close() +            os.close(r) + +    def test_reentrant_write_buffered(self): +        self.check_reentrant_write(b"xy", mode="wb") + +    def test_reentrant_write_text(self): +        self.check_reentrant_write("xy", mode="w", encoding="ascii") + +  class CSignalsTest(SignalsTest):      io = io  class PySignalsTest(SignalsTest):      io = pyio +    # Handling reentrancy issues would slow down _pyio even more, so the +    # tests are disabled. +    test_reentrant_write_buffered = None +    test_reentrant_write_text = None +  def test_main():      tests = (CIOTest, PyIOTest,  | 
