diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/codecs.py | 13 | ||||
-rw-r--r-- | Lib/encodings/utf_16.py | 20 | ||||
-rw-r--r-- | Lib/encodings/utf_32.py | 20 | ||||
-rw-r--r-- | Lib/test/test_codecs.py | 40 |
4 files changed, 74 insertions, 19 deletions
diff --git a/Lib/codecs.py b/Lib/codecs.py index 9490602..f6c2448 100644 --- a/Lib/codecs.py +++ b/Lib/codecs.py @@ -374,6 +374,11 @@ class StreamWriter(Codec): """ pass + def seek(self, offset, whence=0): + self.stream.seek(offset, whence) + if whence == 0 and offset == 0: + self.reset() + def __getattr__(self, name, getattr=getattr): @@ -606,8 +611,8 @@ class StreamReader(Codec): Resets the codec buffers used for keeping state. """ - self.reset() self.stream.seek(offset, whence) + self.reset() def __next__(self): @@ -700,8 +705,10 @@ class StreamReaderWriter: self.writer.reset() def seek(self, offset, whence=0): - self.reader.seek(offset, whence) - self.writer.seek(offset, whence) + self.stream.seek(offset, whence) + self.reader.reset() + if whence == 0 and offset == 0: + self.writer.reset() def __getattr__(self, name, getattr=getattr): diff --git a/Lib/encodings/utf_16.py b/Lib/encodings/utf_16.py index 5500c06..809bc9a 100644 --- a/Lib/encodings/utf_16.py +++ b/Lib/encodings/utf_16.py @@ -103,17 +103,23 @@ class IncrementalDecoder(codecs.BufferedIncrementalDecoder): class StreamWriter(codecs.StreamWriter): def __init__(self, stream, errors='strict'): - self.bom_written = False codecs.StreamWriter.__init__(self, stream, errors) + self.encoder = None + + def reset(self): + codecs.StreamWriter.reset(self) + self.encoder = None def encode(self, input, errors='strict'): - self.bom_written = True - result = codecs.utf_16_encode(input, errors) - if sys.byteorder == 'little': - self.encode = codecs.utf_16_le_encode + if self.encoder is None: + result = codecs.utf_16_encode(input, errors) + if sys.byteorder == 'little': + self.encoder = codecs.utf_16_le_encode + else: + self.encoder = codecs.utf_16_be_encode + return result else: - self.encode = codecs.utf_16_be_encode - return result + return self.encoder(input, errors) class StreamReader(codecs.StreamReader): diff --git a/Lib/encodings/utf_32.py b/Lib/encodings/utf_32.py index f0b7709..c052928 100644 --- a/Lib/encodings/utf_32.py +++ b/Lib/encodings/utf_32.py @@ -98,17 +98,23 @@ class IncrementalDecoder(codecs.BufferedIncrementalDecoder): class StreamWriter(codecs.StreamWriter): def __init__(self, stream, errors='strict'): - self.bom_written = False + self.encoder = None codecs.StreamWriter.__init__(self, stream, errors) + def reset(self): + codecs.StreamWriter.reset(self) + self.encoder = None + def encode(self, input, errors='strict'): - self.bom_written = True - result = codecs.utf_32_encode(input, errors) - if sys.byteorder == 'little': - self.encode = codecs.utf_32_le_encode + if self.encoder is None: + result = codecs.utf_32_encode(input, errors) + if sys.byteorder == 'little': + self.encoder = codecs.utf_32_le_encode + else: + self.encoder = codecs.utf_32_be_encode + return result else: - self.encode = codecs.utf_32_be_encode - return result + return self.encoder(input, errors) class StreamReader(codecs.StreamReader): diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 6e7afc4..598aaa2 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1604,8 +1604,8 @@ class BomTest(unittest.TestCase): "utf-32-le", "utf-32-be") for encoding in tests: - with codecs.open('foo', 'w+', encoding=encoding) as f: - # Check if the BOM is written only once + # Check if the BOM is written only once + with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: f.write(data) f.write(data) f.seek(0) @@ -1613,6 +1613,42 @@ class BomTest(unittest.TestCase): f.seek(0) self.assertEquals(f.read(), data * 2) + # Check that the BOM is written after a seek(0) + with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: + f.write(data[0]) + self.assertNotEquals(f.tell(), 0) + f.seek(0) + f.write(data) + f.seek(0) + self.assertEquals(f.read(), data) + + # (StreamWriter) Check that the BOM is written after a seek(0) + with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: + f.writer.write(data[0]) + self.assertNotEquals(f.writer.tell(), 0) + f.writer.seek(0) + f.writer.write(data) + f.seek(0) + self.assertEquals(f.read(), data) + + # Check that the BOM is not written after a seek() at a position + # different than the start + with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: + f.write(data) + f.seek(f.tell()) + f.write(data) + f.seek(0) + self.assertEquals(f.read(), data * 2) + + # (StreamWriter) Check that the BOM is not written after a seek() + # at a position different than the start + with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: + f.writer.write(data) + f.writer.seek(f.writer.tell()) + f.writer.write(data) + f.seek(0) + self.assertEquals(f.read(), data * 2) + def test_main(): support.run_unittest( |