diff options
Diffstat (limited to 'Lib/test/test_io.py')
| -rw-r--r-- | Lib/test/test_io.py | 51 | 
1 files changed, 31 insertions, 20 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index eb2ac5f..318f7a7 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -46,7 +46,7 @@ except ImportError:  def _default_chunk_size():      """Get the default TextIOWrapper chunk size""" -    with open(__file__, "r", encoding="latin1") as f: +    with open(__file__, "r", encoding="latin-1") as f:          return f._CHUNK_SIZE @@ -621,6 +621,15 @@ class IOTest(unittest.TestCase):          for obj in test:              self.assertTrue(hasattr(obj, "__dict__")) +    def test_opener(self): +        with self.open(support.TESTFN, "w") as f: +            f.write("egg\n") +        fd = os.open(support.TESTFN, os.O_RDONLY) +        def opener(path, flags): +            return fd +        with self.open("non-existent", "r", opener=opener) as f: +            self.assertEqual(f.read(), "egg\n") +  class CIOTest(IOTest):      def test_IOBase_finalize(self): @@ -822,6 +831,12 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):          self.assertEqual(b, b"gf")          self.assertEqual(bufio.readinto(b), 0)          self.assertEqual(b, b"gf") +        rawio = self.MockRawIO((b"abc", None)) +        bufio = self.tp(rawio) +        self.assertEqual(bufio.readinto(b), 2) +        self.assertEqual(b, b"ab") +        self.assertEqual(bufio.readinto(b), 1) +        self.assertEqual(b, b"cb")      def test_readlines(self):          def bufio(): @@ -1789,11 +1804,11 @@ class TextIOWrapperTest(unittest.TestCase):          r = self.BytesIO(b"\xc3\xa9\n\n")          b = self.BufferedReader(r, 1000)          t = self.TextIOWrapper(b) -        t.__init__(b, encoding="latin1", newline="\r\n") -        self.assertEqual(t.encoding, "latin1") +        t.__init__(b, encoding="latin-1", newline="\r\n") +        self.assertEqual(t.encoding, "latin-1")          self.assertEqual(t.line_buffering, False) -        t.__init__(b, encoding="utf8", line_buffering=True) -        self.assertEqual(t.encoding, "utf8") +        t.__init__(b, encoding="utf-8", line_buffering=True) +        self.assertEqual(t.encoding, "utf-8")          self.assertEqual(t.line_buffering, True)          self.assertEqual("\xe9\n", t.readline())          self.assertRaises(TypeError, t.__init__, b, newline=42) @@ -1843,8 +1858,8 @@ class TextIOWrapperTest(unittest.TestCase):      def test_encoding(self):          # Check the encoding attribute is always set, and valid          b = self.BytesIO() -        t = self.TextIOWrapper(b, encoding="utf8") -        self.assertEqual(t.encoding, "utf8") +        t = self.TextIOWrapper(b, encoding="utf-8") +        self.assertEqual(t.encoding, "utf-8")          t = self.TextIOWrapper(b)          self.assertTrue(t.encoding is not None)          codecs.lookup(t.encoding) @@ -1937,8 +1952,8 @@ class TextIOWrapperTest(unittest.TestCase):          testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"          normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")          for newline, expected in [ -            (None, normalized.decode("ascii").splitlines(True)), -            ("", testdata.decode("ascii").splitlines(True)), +            (None, normalized.decode("ascii").splitlines(keepends=True)), +            ("", testdata.decode("ascii").splitlines(keepends=True)),              ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),              ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),              ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), @@ -2023,7 +2038,7 @@ class TextIOWrapperTest(unittest.TestCase):      def test_basic_io(self):          for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): -            for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": +            for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":                  f = self.open(support.TESTFN, "w+", encoding=enc)                  f._CHUNK_SIZE = chunksize                  self.assertEqual(f.write("abc"), 3) @@ -2073,7 +2088,7 @@ class TextIOWrapperTest(unittest.TestCase):          self.assertEqual(rlines, wlines)      def test_telling(self): -        f = self.open(support.TESTFN, "w+", encoding="utf8") +        f = self.open(support.TESTFN, "w+", encoding="utf-8")          p0 = f.tell()          f.write("\xff\n")          p1 = f.tell() @@ -2319,6 +2334,7 @@ class TextIOWrapperTest(unittest.TestCase):          with self.open(support.TESTFN, "w", errors="replace") as f:              self.assertEqual(f.errors, "replace") +    @support.no_tracing      @unittest.skipUnless(threading, 'Threading required for this test.')      def test_threads_write(self):          # Issue6750: concurrent writes could duplicate data @@ -2636,12 +2652,6 @@ class MiscIOTest(unittest.TestCase):      def test_blockingioerror(self):          # Various BlockingIOError issues -        self.assertRaises(TypeError, self.BlockingIOError) -        self.assertRaises(TypeError, self.BlockingIOError, 1) -        self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) -        self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) -        b = self.BlockingIOError(1, "") -        self.assertEqual(b.characters_written, 0)          class C(str):              pass          c = C("") @@ -2763,14 +2773,14 @@ class SignalsTest(unittest.TestCase):          1/0      @unittest.skipUnless(threading, 'Threading required for this test.') -    @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'), -                     'issue #12429: skip test on FreeBSD <= 7')      def check_interrupted_write(self, item, bytes, **fdopen_kwargs):          """Check that a partial write, when it gets interrupted, properly          invokes the signal handler, and bubbles up the exception raised          in the latter."""          read_results = []          def _read(): +            if hasattr(signal, 'pthread_sigmask'): +                signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])              s = os.read(r, 1)              read_results.append(s)          t = threading.Thread(target=_read) @@ -2788,7 +2798,7 @@ class SignalsTest(unittest.TestCase):              # The buffered IO layer must check for pending signal              # handlers, which in this case will invoke alarm_interrupt().              self.assertRaises(ZeroDivisionError, -                              wio.write, item * (1024 * 1024)) +                        wio.write, item * (support.PIPE_MAX_SIZE // len(item)))              t.join()              # We got one byte, get another one and check that it isn't a              # repeat of the first one. @@ -2815,6 +2825,7 @@ class SignalsTest(unittest.TestCase):      def test_interrupted_write_text(self):          self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") +    @support.no_tracing      def check_reentrant_write(self, data, **fdopen_kwargs):          def on_alarm(*args):              # Will be called reentrantly from the same thread  | 
