summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_io.py128
1 files changed, 120 insertions, 8 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index dace642..697f69e 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -489,6 +489,10 @@ class BufferedRandomTest(unittest.TestCase):
class TextIOWrapperTest(unittest.TestCase):
+ def setUp(self):
+ self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
+ self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
+
def tearDown(self):
test_support.unlink(test_support.TESTFN)
@@ -496,14 +500,14 @@ class TextIOWrapperTest(unittest.TestCase):
testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
for newline, expected in [
- (None, normalized.decode("ASCII").splitlines(True)),
- ("", testdata.decode("ASCII").splitlines(True)),
+ (None, normalized.decode("ascii").splitlines(True)),
+ ("", testdata.decode("ascii").splitlines(True)),
("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
]:
buf = io.BytesIO(testdata)
- txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+ txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
self.assertEquals(txt.readlines(), expected)
txt.seek(0)
self.assertEquals(txt.read(), "".join(expected))
@@ -518,7 +522,7 @@ class TextIOWrapperTest(unittest.TestCase):
tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
for newline, expected in tests:
buf = io.BytesIO()
- txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+ txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
txt.write("AAA\nB")
txt.write("BB\nCCC\n")
txt.write("X\rY\r\nZ")
@@ -568,14 +572,14 @@ class TextIOWrapperTest(unittest.TestCase):
testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
for newline, expected in [
- (None, normalized.decode("ASCII").splitlines(True)),
- ("", testdata.decode("ASCII").splitlines(True)),
+ (None, normalized.decode("ascii").splitlines(True)),
+ ("", testdata.decode("ascii").splitlines(True)),
("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
]:
buf = io.BytesIO(testdata)
- txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+ txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
self.assertEquals(txt.readlines(), expected)
txt.seek(0)
self.assertEquals(txt.read(), "".join(expected))
@@ -600,7 +604,7 @@ class TextIOWrapperTest(unittest.TestCase):
("\r\n", "\r\n", data_crlf),
]:
buf = io.BytesIO()
- txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+ txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
txt.write(data)
txt.close()
self.assertEquals(buf.getvalue(), expected)
@@ -745,6 +749,114 @@ class TextIOWrapperTest(unittest.TestCase):
print("Reading using readline(): %6.3f seconds" % (t3-t2))
print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
+ def testReadOneByOne(self):
+ txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
+ reads = ""
+ while True:
+ c = txt.read(1)
+ if not c:
+ break
+ reads += c
+ self.assertEquals(reads, "AA\nBB")
+
+ # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
+ def testReadByChunk(self):
+ # make sure "\r\n" straddles 128 char boundary.
+ txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
+ reads = ""
+ while True:
+ c = txt.read(128)
+ if not c:
+ break
+ reads += c
+ self.assertEquals(reads, "A"*127+"\nB")
+
+ def test_issue1395_1(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
+
+ # read one char at a time
+ reads = ""
+ while True:
+ c = txt.read(1)
+ if not c:
+ break
+ reads += c
+ self.assertEquals(reads, self.normalized)
+
+ def test_issue1395_2(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = ""
+ while True:
+ c = txt.read(4)
+ if not c:
+ break
+ reads += c
+ self.assertEquals(reads, self.normalized)
+
+ def test_issue1395_3(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ reads += txt.read(4)
+ reads += txt.readline()
+ reads += txt.readline()
+ reads += txt.readline()
+ self.assertEquals(reads, self.normalized)
+
+ def test_issue1395_4(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ reads += txt.read()
+ self.assertEquals(reads, self.normalized)
+
+ def test_issue1395_5(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ pos = txt.tell()
+ txt.seek(0)
+ txt.seek(pos)
+ self.assertEquals(txt.read(4), "BBB\n")
+
+ def test_newline_decoder(self):
+ import codecs
+ decoder = codecs.getincrementaldecoder("utf-8")()
+ decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
+
+ self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
+
+ self.assertEquals(decoder.decode(b'\xe8'), "")
+ self.assertEquals(decoder.decode(b'\xa2'), "")
+ self.assertEquals(decoder.decode(b'\x88'), "\u8888")
+
+ self.assertEquals(decoder.decode(b'\xe8'), "")
+ self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
+
+ decoder.setstate((b'', 0))
+ self.assertEquals(decoder.decode(b'\n'), "\n")
+ self.assertEquals(decoder.decode(b'\r'), "")
+ self.assertEquals(decoder.decode(b'', final=True), "\n")
+ self.assertEquals(decoder.decode(b'\r', final=True), "\n")
+
+ self.assertEquals(decoder.decode(b'\r'), "")
+ self.assertEquals(decoder.decode(b'a'), "\na")
+
+ self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
+ self.assertEquals(decoder.decode(b'\r'), "")
+ self.assertEquals(decoder.decode(b'\r'), "\n")
+ self.assertEquals(decoder.decode(b'\na'), "\na")
+
+ self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
+ self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
+ self.assertEquals(decoder.decode(b'\n'), "\n")
+ self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
+ self.assertEquals(decoder.decode(b'\n'), "\n")
# XXX Tests for open()