summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
authorKa-Ping Yee <ping@zesty.ca>2008-03-18 04:51:32 (GMT)
committerKa-Ping Yee <ping@zesty.ca>2008-03-18 04:51:32 (GMT)
commitf44c7e8996d8115739b52fa52896f2f9f7d94142 (patch)
treee25ae7401c2d6d21c3b75e8990df79b7dca52949 /Lib/test/test_io.py
parentb5dc90b5faee55dc42e2034b7510e972ac090fdb (diff)
downloadcpython-f44c7e8996d8115739b52fa52896f2f9f7d94142.zip
cpython-f44c7e8996d8115739b52fa52896f2f9f7d94142.tar.gz
cpython-f44c7e8996d8115739b52fa52896f2f9f7d94142.tar.bz2
Make TextIOWrapper's seek/tell work properly with stateful decoders;
document and rename things to make seek/tell workings a little clearer. Add a weird decoder for testing TextIOWrapper's seek/tell methods. Document the getstate/setstate protocol conventions for IncrementalDecoders.
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py171
1 files changed, 171 insertions, 0 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 0bc2b48..49404e1 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -8,6 +8,7 @@ import unittest
from itertools import chain
from test import test_support
+import codecs
import io # The module under test
@@ -486,6 +487,122 @@ class BufferedRandomTest(unittest.TestCase):
self.assertEquals(b"fl", rw.read(11))
self.assertRaises(TypeError, rw.seek, 0.0)
+# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
+# properties:
+# - A single output character can correspond to many bytes of input.
+# - The number of input bytes to complete the character can be
+# undetermined until the last input byte is received.
+# - The number of input bytes can vary depending on previous input.
+# - A single input byte can correspond to many characters of output.
+# - The number of output characters can be undetermined until the
+# last input byte is received.
+# - The number of output characters can vary depending on previous input.
+
+class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
+ """
+ For testing seek/tell behavior with a stateful, buffering decoder.
+
+ Input is a sequence of words. Words may be fixed-length (length set
+ by input) or variable-length (period-terminated). In variable-length
+ mode, extra periods are ignored. Possible words are:
+ - 'i' followed by a number sets the input length, I (maximum 99).
+ When I is set to 0, words are space-terminated.
+ - 'o' followed by a number sets the output length, O (maximum 99).
+ - Any other word is converted into a word followed by a period on
+ the output. The output word consists of the input word truncated
+ or padded out with hyphens to make its length equal to O. If O
+ is 0, the word is output verbatim without truncating or padding.
+ I and O are initially set to 1. When I changes, any buffered input is
+ re-scanned according to the new I. EOF also terminates the last word.
+ """
+
+ def __init__(self, errors='strict'):
+ codecs.IncrementalEncoder.__init__(self, errors)
+ self.reset()
+
+ def __repr__(self):
+ return '<SID %x>' % id(self)
+
+ def reset(self):
+ self.i = 1
+ self.o = 1
+ self.buffer = bytearray()
+
+ def getstate(self):
+ i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
+ return bytes(self.buffer), i*100 + o
+
+ def setstate(self, state):
+ buffer, io = state
+ self.buffer = bytearray(buffer)
+ i, o = divmod(io, 100)
+ self.i, self.o = i ^ 1, o ^ 1
+
+ def decode(self, input, final=False):
+ output = ''
+ for b in input:
+ if self.i == 0: # variable-length, terminated with period
+ if b == ord('.'):
+ if self.buffer:
+ output += self.process_word()
+ else:
+ self.buffer.append(b)
+ else: # fixed-length, terminate after self.i bytes
+ self.buffer.append(b)
+ if len(self.buffer) == self.i:
+ output += self.process_word()
+ if final and self.buffer: # EOF terminates the last word
+ output += self.process_word()
+ return output
+
+ def process_word(self):
+ output = ''
+ if self.buffer[0] == ord('i'):
+ self.i = min(99, int(self.buffer[1:] or 0)) # set input length
+ elif self.buffer[0] == ord('o'):
+ self.o = min(99, int(self.buffer[1:] or 0)) # set output length
+ else:
+ output = self.buffer.decode('ascii')
+ if len(output) < self.o:
+ output += '-'*self.o # pad out with hyphens
+ if self.o:
+ output = output[:self.o] # truncate to output length
+ output += '.'
+ self.buffer = bytearray()
+ return output
+
+class StatefulIncrementalDecoderTest(unittest.TestCase):
+ """
+ Make sure the StatefulIncrementalDecoder actually works.
+ """
+
+ test_cases = [
+ # I=1 fixed-length mode
+ (b'abcd', False, 'a.b.c.d.'),
+ # I=0, O=0, variable-length mode
+ (b'oiabcd', True, 'abcd.'),
+ # I=0, O=0, variable-length mode, should ignore extra periods
+ (b'oi...abcd...', True, 'abcd.'),
+ # I=0, O=6
+ (b'i.o6.xyz.', False, 'xyz---.'),
+ # I=2, O=6
+ (b'i.i2.o6xyz', True, 'xy----.z-----.'),
+ # I=0, O=3
+ (b'i.o3.x.xyz.toolong.', False, 'x--.xyz.too.'),
+ # I=6, O=3
+ (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.')
+ ]
+
+ def testDecoder(self):
+ # Try a few one-shot test cases.
+ for input, eof, output in self.test_cases:
+ d = StatefulIncrementalDecoder()
+ self.assertEquals(d.decode(input, eof), output)
+
+ # Also test an unfinished decode, followed by forcing EOF.
+ d = StatefulIncrementalDecoder()
+ self.assertEquals(d.decode(b'oiabcd'), '')
+ self.assertEquals(d.decode(b'', 1), 'abcd.')
class TextIOWrapperTest(unittest.TestCase):
@@ -765,6 +882,60 @@ class TextIOWrapperTest(unittest.TestCase):
f.readline()
f.tell()
+ def testSeekAndTell(self):
+ """Test seek/tell using the StatefulIncrementalDecoder."""
+
+ def lookupTestDecoder(name):
+ if self.codecEnabled and name == 'test_decoder':
+ return codecs.CodecInfo(
+ name='test_decoder', encode=None, decode=None,
+ incrementalencoder=None,
+ streamreader=None, streamwriter=None,
+ incrementaldecoder=StatefulIncrementalDecoder)
+
+ def testSeekAndTellWithData(data, min_pos=0):
+ """Tell/seek to various points within a data stream and ensure
+ that the decoded data returned by read() is consistent."""
+ f = io.open(test_support.TESTFN, 'wb')
+ f.write(data)
+ f.close()
+ f = io.open(test_support.TESTFN, encoding='test_decoder')
+ decoded = f.read()
+ f.close()
+
+ for i in range(min_pos, len(decoded) + 1): # seek positions
+ for j in [1, 5, len(decoded) - i]: # read lengths
+ f = io.open(test_support.TESTFN, encoding='test_decoder')
+ self.assertEquals(f.read(i), decoded[:i])
+ cookie = f.tell()
+ self.assertEquals(f.read(j), decoded[i:i + j])
+ f.seek(cookie)
+ self.assertEquals(f.read(), decoded[i:])
+ f.close()
+
+ # Register a special incremental decoder for testing.
+ codecs.register(lookupTestDecoder)
+ self.codecEnabled = 1
+
+ # Run the tests.
+ try:
+ # Try each test case.
+ for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
+ testSeekAndTellWithData(input)
+
+ # Position each test case so that it crosses a chunk boundary.
+ CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
+ for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
+ offset = CHUNK_SIZE - len(input)//2
+ prefix = b'.'*offset
+ # Don't bother seeking into the prefix (takes too long).
+ min_pos = offset*2
+ testSeekAndTellWithData(prefix + input, min_pos)
+
+ # Ensure our test decoder won't interfere with subsequent tests.
+ finally:
+ self.codecEnabled = 0
+
def testEncodedWrites(self):
data = "1234567890"
tests = ("utf-16",