summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/codecs.py13
-rw-r--r--Lib/encodings/utf_16.py20
-rw-r--r--Lib/encodings/utf_32.py20
-rw-r--r--Lib/test/test_codecs.py40
4 files changed, 74 insertions, 19 deletions
diff --git a/Lib/codecs.py b/Lib/codecs.py
index 9490602..f6c2448 100644
--- a/Lib/codecs.py
+++ b/Lib/codecs.py
@@ -374,6 +374,11 @@ class StreamWriter(Codec):
"""
pass
+ def seek(self, offset, whence=0):
+ self.stream.seek(offset, whence)
+ if whence == 0 and offset == 0:
+ self.reset()
+
def __getattr__(self, name,
getattr=getattr):
@@ -606,8 +611,8 @@ class StreamReader(Codec):
Resets the codec buffers used for keeping state.
"""
- self.reset()
self.stream.seek(offset, whence)
+ self.reset()
def __next__(self):
@@ -700,8 +705,10 @@ class StreamReaderWriter:
self.writer.reset()
def seek(self, offset, whence=0):
- self.reader.seek(offset, whence)
- self.writer.seek(offset, whence)
+ self.stream.seek(offset, whence)
+ self.reader.reset()
+ if whence == 0 and offset == 0:
+ self.writer.reset()
def __getattr__(self, name,
getattr=getattr):
diff --git a/Lib/encodings/utf_16.py b/Lib/encodings/utf_16.py
index 5500c06..809bc9a 100644
--- a/Lib/encodings/utf_16.py
+++ b/Lib/encodings/utf_16.py
@@ -103,17 +103,23 @@ class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
class StreamWriter(codecs.StreamWriter):
def __init__(self, stream, errors='strict'):
- self.bom_written = False
codecs.StreamWriter.__init__(self, stream, errors)
+ self.encoder = None
+
+ def reset(self):
+ codecs.StreamWriter.reset(self)
+ self.encoder = None
def encode(self, input, errors='strict'):
- self.bom_written = True
- result = codecs.utf_16_encode(input, errors)
- if sys.byteorder == 'little':
- self.encode = codecs.utf_16_le_encode
+ if self.encoder is None:
+ result = codecs.utf_16_encode(input, errors)
+ if sys.byteorder == 'little':
+ self.encoder = codecs.utf_16_le_encode
+ else:
+ self.encoder = codecs.utf_16_be_encode
+ return result
else:
- self.encode = codecs.utf_16_be_encode
- return result
+ return self.encoder(input, errors)
class StreamReader(codecs.StreamReader):
diff --git a/Lib/encodings/utf_32.py b/Lib/encodings/utf_32.py
index f0b7709..c052928 100644
--- a/Lib/encodings/utf_32.py
+++ b/Lib/encodings/utf_32.py
@@ -98,17 +98,23 @@ class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
class StreamWriter(codecs.StreamWriter):
def __init__(self, stream, errors='strict'):
- self.bom_written = False
+ self.encoder = None
codecs.StreamWriter.__init__(self, stream, errors)
+ def reset(self):
+ codecs.StreamWriter.reset(self)
+ self.encoder = None
+
def encode(self, input, errors='strict'):
- self.bom_written = True
- result = codecs.utf_32_encode(input, errors)
- if sys.byteorder == 'little':
- self.encode = codecs.utf_32_le_encode
+ if self.encoder is None:
+ result = codecs.utf_32_encode(input, errors)
+ if sys.byteorder == 'little':
+ self.encoder = codecs.utf_32_le_encode
+ else:
+ self.encoder = codecs.utf_32_be_encode
+ return result
else:
- self.encode = codecs.utf_32_be_encode
- return result
+ return self.encoder(input, errors)
class StreamReader(codecs.StreamReader):
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py
index 6e7afc4..598aaa2 100644
--- a/Lib/test/test_codecs.py
+++ b/Lib/test/test_codecs.py
@@ -1604,8 +1604,8 @@ class BomTest(unittest.TestCase):
"utf-32-le",
"utf-32-be")
for encoding in tests:
- with codecs.open('foo', 'w+', encoding=encoding) as f:
- # Check if the BOM is written only once
+ # Check if the BOM is written only once
+ with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
f.write(data)
f.write(data)
f.seek(0)
@@ -1613,6 +1613,42 @@ class BomTest(unittest.TestCase):
f.seek(0)
self.assertEquals(f.read(), data * 2)
+ # Check that the BOM is written after a seek(0)
+ with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
+ f.write(data[0])
+ self.assertNotEquals(f.tell(), 0)
+ f.seek(0)
+ f.write(data)
+ f.seek(0)
+ self.assertEquals(f.read(), data)
+
+ # (StreamWriter) Check that the BOM is written after a seek(0)
+ with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
+ f.writer.write(data[0])
+ self.assertNotEquals(f.writer.tell(), 0)
+ f.writer.seek(0)
+ f.writer.write(data)
+ f.seek(0)
+ self.assertEquals(f.read(), data)
+
+ # Check that the BOM is not written after a seek() at a position
+ # different than the start
+ with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
+ f.write(data)
+ f.seek(f.tell())
+ f.write(data)
+ f.seek(0)
+ self.assertEquals(f.read(), data * 2)
+
+ # (StreamWriter) Check that the BOM is not written after a seek()
+ # at a position different than the start
+ with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
+ f.writer.write(data)
+ f.writer.seek(f.writer.tell())
+ f.writer.write(data)
+ f.seek(0)
+ self.assertEquals(f.read(), data * 2)
+
def test_main():
support.run_unittest(