diff options
author | Ka-Ping Yee <ping@zesty.ca> | 2008-03-18 04:51:32 (GMT) |
---|---|---|
committer | Ka-Ping Yee <ping@zesty.ca> | 2008-03-18 04:51:32 (GMT) |
commit | f44c7e8996d8115739b52fa52896f2f9f7d94142 (patch) | |
tree | e25ae7401c2d6d21c3b75e8990df79b7dca52949 /Lib/test/test_io.py | |
parent | b5dc90b5faee55dc42e2034b7510e972ac090fdb (diff) | |
download | cpython-f44c7e8996d8115739b52fa52896f2f9f7d94142.zip cpython-f44c7e8996d8115739b52fa52896f2f9f7d94142.tar.gz cpython-f44c7e8996d8115739b52fa52896f2f9f7d94142.tar.bz2 |
Make TextIOWrapper's seek/tell work properly with stateful decoders;
document and rename things to make seek/tell workings a little clearer.
Add a weird decoder for testing TextIOWrapper's seek/tell methods.
Document the getstate/setstate protocol conventions for IncrementalDecoders.
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 0bc2b48..49404e1 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -8,6 +8,7 @@ import unittest from itertools import chain from test import test_support +import codecs import io # The module under test @@ -486,6 +487,122 @@ class BufferedRandomTest(unittest.TestCase): self.assertEquals(b"fl", rw.read(11)) self.assertRaises(TypeError, rw.seek, 0.0) +# To fully exercise seek/tell, the StatefulIncrementalDecoder has these +# properties: +# - A single output character can correspond to many bytes of input. +# - The number of input bytes to complete the character can be +# undetermined until the last input byte is received. +# - The number of input bytes can vary depending on previous input. +# - A single input byte can correspond to many characters of output. +# - The number of output characters can be undetermined until the +# last input byte is received. +# - The number of output characters can vary depending on previous input. + +class StatefulIncrementalDecoder(codecs.IncrementalDecoder): + """ + For testing seek/tell behavior with a stateful, buffering decoder. + + Input is a sequence of words. Words may be fixed-length (length set + by input) or variable-length (period-terminated). In variable-length + mode, extra periods are ignored. Possible words are: + - 'i' followed by a number sets the input length, I (maximum 99). + When I is set to 0, words are space-terminated. + - 'o' followed by a number sets the output length, O (maximum 99). + - Any other word is converted into a word followed by a period on + the output. The output word consists of the input word truncated + or padded out with hyphens to make its length equal to O. If O + is 0, the word is output verbatim without truncating or padding. + I and O are initially set to 1. When I changes, any buffered input is + re-scanned according to the new I. EOF also terminates the last word. + """ + + def __init__(self, errors='strict'): + codecs.IncrementalEncoder.__init__(self, errors) + self.reset() + + def __repr__(self): + return '<SID %x>' % id(self) + + def reset(self): + self.i = 1 + self.o = 1 + self.buffer = bytearray() + + def getstate(self): + i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() + return bytes(self.buffer), i*100 + o + + def setstate(self, state): + buffer, io = state + self.buffer = bytearray(buffer) + i, o = divmod(io, 100) + self.i, self.o = i ^ 1, o ^ 1 + + def decode(self, input, final=False): + output = '' + for b in input: + if self.i == 0: # variable-length, terminated with period + if b == ord('.'): + if self.buffer: + output += self.process_word() + else: + self.buffer.append(b) + else: # fixed-length, terminate after self.i bytes + self.buffer.append(b) + if len(self.buffer) == self.i: + output += self.process_word() + if final and self.buffer: # EOF terminates the last word + output += self.process_word() + return output + + def process_word(self): + output = '' + if self.buffer[0] == ord('i'): + self.i = min(99, int(self.buffer[1:] or 0)) # set input length + elif self.buffer[0] == ord('o'): + self.o = min(99, int(self.buffer[1:] or 0)) # set output length + else: + output = self.buffer.decode('ascii') + if len(output) < self.o: + output += '-'*self.o # pad out with hyphens + if self.o: + output = output[:self.o] # truncate to output length + output += '.' + self.buffer = bytearray() + return output + +class StatefulIncrementalDecoderTest(unittest.TestCase): + """ + Make sure the StatefulIncrementalDecoder actually works. + """ + + test_cases = [ + # I=1 fixed-length mode + (b'abcd', False, 'a.b.c.d.'), + # I=0, O=0, variable-length mode + (b'oiabcd', True, 'abcd.'), + # I=0, O=0, variable-length mode, should ignore extra periods + (b'oi...abcd...', True, 'abcd.'), + # I=0, O=6 + (b'i.o6.xyz.', False, 'xyz---.'), + # I=2, O=6 + (b'i.i2.o6xyz', True, 'xy----.z-----.'), + # I=0, O=3 + (b'i.o3.x.xyz.toolong.', False, 'x--.xyz.too.'), + # I=6, O=3 + (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.') + ] + + def testDecoder(self): + # Try a few one-shot test cases. + for input, eof, output in self.test_cases: + d = StatefulIncrementalDecoder() + self.assertEquals(d.decode(input, eof), output) + + # Also test an unfinished decode, followed by forcing EOF. + d = StatefulIncrementalDecoder() + self.assertEquals(d.decode(b'oiabcd'), '') + self.assertEquals(d.decode(b'', 1), 'abcd.') class TextIOWrapperTest(unittest.TestCase): @@ -765,6 +882,60 @@ class TextIOWrapperTest(unittest.TestCase): f.readline() f.tell() + def testSeekAndTell(self): + """Test seek/tell using the StatefulIncrementalDecoder.""" + + def lookupTestDecoder(name): + if self.codecEnabled and name == 'test_decoder': + return codecs.CodecInfo( + name='test_decoder', encode=None, decode=None, + incrementalencoder=None, + streamreader=None, streamwriter=None, + incrementaldecoder=StatefulIncrementalDecoder) + + def testSeekAndTellWithData(data, min_pos=0): + """Tell/seek to various points within a data stream and ensure + that the decoded data returned by read() is consistent.""" + f = io.open(test_support.TESTFN, 'wb') + f.write(data) + f.close() + f = io.open(test_support.TESTFN, encoding='test_decoder') + decoded = f.read() + f.close() + + for i in range(min_pos, len(decoded) + 1): # seek positions + for j in [1, 5, len(decoded) - i]: # read lengths + f = io.open(test_support.TESTFN, encoding='test_decoder') + self.assertEquals(f.read(i), decoded[:i]) + cookie = f.tell() + self.assertEquals(f.read(j), decoded[i:i + j]) + f.seek(cookie) + self.assertEquals(f.read(), decoded[i:]) + f.close() + + # Register a special incremental decoder for testing. + codecs.register(lookupTestDecoder) + self.codecEnabled = 1 + + # Run the tests. + try: + # Try each test case. + for input, _, _ in StatefulIncrementalDecoderTest.test_cases: + testSeekAndTellWithData(input) + + # Position each test case so that it crosses a chunk boundary. + CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE + for input, _, _ in StatefulIncrementalDecoderTest.test_cases: + offset = CHUNK_SIZE - len(input)//2 + prefix = b'.'*offset + # Don't bother seeking into the prefix (takes too long). + min_pos = offset*2 + testSeekAndTellWithData(prefix + input, min_pos) + + # Ensure our test decoder won't interfere with subsequent tests. + finally: + self.codecEnabled = 0 + def testEncodedWrites(self): data = "1234567890" tests = ("utf-16", |