diff options
author | Alexandre Vassalotti <alexandre@peadrop.com> | 2008-05-06 19:48:38 (GMT) |
---|---|---|
committer | Alexandre Vassalotti <alexandre@peadrop.com> | 2008-05-06 19:48:38 (GMT) |
commit | 77250f4df7a73a5c87d12d781a562747a855cd95 (patch) | |
tree | 61f83259cd270f6011c75cbe95d2a25d12c29f87 /Lib | |
parent | 5d8da20dd1034081906dbdffea9c77bf39353dec (diff) | |
download | cpython-77250f4df7a73a5c87d12d781a562747a855cd95.zip cpython-77250f4df7a73a5c87d12d781a562747a855cd95.tar.gz cpython-77250f4df7a73a5c87d12d781a562747a855cd95.tar.bz2 |
Added fast alternate io.BytesIO implementation and its test suite.
Removed old test suite for StringIO.
Modified truncate() to imply a seek to given argument value.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/io.py | 75 | ||||
-rw-r--r-- | Lib/test/test_StringIO.py | 127 | ||||
-rw-r--r-- | Lib/test/test_io.py | 8 | ||||
-rw-r--r-- | Lib/test/test_largefile.py | 7 | ||||
-rw-r--r-- | Lib/test/test_memoryio.py | 405 | ||||
-rw-r--r-- | Lib/test/test_mimetools.py | 2 |
6 files changed, 481 insertions, 143 deletions
@@ -490,6 +490,7 @@ class IOBase(metaclass=abc.ABCMeta): terminator(s) recognized. """ # For backwards compatibility, a (slowish) readline(). + self._checkClosed() if hasattr(self, "peek"): def nreadahead(): readahead = self.peek(1) @@ -531,7 +532,7 @@ class IOBase(metaclass=abc.ABCMeta): lines will be read if the total size (in bytes/characters) of all lines so far exceeds hint. """ - if hint is None: + if hint is None or hint <= 0: return list(self) n = 0 lines = [] @@ -726,6 +727,8 @@ class _BufferedIOMixin(BufferedIOBase): if pos is None: pos = self.tell() + # XXX: Should seek() be used, instead of passing the position + # XXX directly to truncate? return self.raw.truncate(pos) ### Flush and close ### @@ -765,7 +768,7 @@ class _BufferedIOMixin(BufferedIOBase): return self.raw.isatty() -class BytesIO(BufferedIOBase): +class _BytesIO(BufferedIOBase): """Buffered I/O implementation using an in-memory bytes buffer.""" @@ -779,13 +782,19 @@ class BytesIO(BufferedIOBase): def getvalue(self): """Return the bytes value (contents) of the buffer """ + if self.closed: + raise ValueError("getvalue on closed file") return bytes(self._buffer) def read(self, n=None): + if self.closed: + raise ValueError("read from closed file") if n is None: n = -1 if n < 0: n = len(self._buffer) + if len(self._buffer) <= self._pos: + return self._buffer[:0] newpos = min(len(self._buffer), self._pos + n) b = self._buffer[self._pos : newpos] self._pos = newpos @@ -802,6 +811,8 @@ class BytesIO(BufferedIOBase): if isinstance(b, str): raise TypeError("can't write str to binary stream") n = len(b) + if n == 0: + return 0 newpos = self._pos + n if newpos > len(self._buffer): # Inserts null bytes between the current end of the file @@ -813,28 +824,38 @@ class BytesIO(BufferedIOBase): return n def seek(self, pos, whence=0): + if self.closed: + raise ValueError("seek on closed file") try: pos = pos.__index__() except AttributeError as err: raise TypeError("an integer is required") from err if whence == 0: self._pos = max(0, pos) + if pos < 0: + raise ValueError("negative seek position %r" % (pos,)) elif whence == 1: self._pos = max(0, self._pos + pos) elif whence == 2: self._pos = max(0, len(self._buffer) + pos) else: - raise IOError("invalid whence value") + raise ValueError("invalid whence value") return self._pos def tell(self): + if self.closed: + raise ValueError("tell on closed file") return self._pos def truncate(self, pos=None): + if self.closed: + raise ValueError("truncate on closed file") if pos is None: pos = self._pos + elif pos < 0: + raise ValueError("negative truncate position %r" % (pos,)) del self._buffer[pos:] - return pos + return self.seek(pos) def readable(self): return True @@ -845,6 +866,16 @@ class BytesIO(BufferedIOBase): def seekable(self): return True +# Use the faster implementation of BytesIO if available +try: + import _bytesio + + class BytesIO(_bytesio._BytesIO, BufferedIOBase): + __doc__ = _bytesio._BytesIO.__doc__ + +except ImportError: + BytesIO = _BytesIO + class BufferedReader(_BufferedIOMixin): @@ -978,6 +1009,12 @@ class BufferedWriter(_BufferedIOMixin): raise BlockingIOError(e.errno, e.strerror, overage) return written + def truncate(self, pos=None): + self.flush() + if pos is None: + pos = self.raw.tell() + return self.raw.truncate(pos) + def flush(self): if self.closed: raise ValueError("flush of closed file") @@ -1097,6 +1134,13 @@ class BufferedRandom(BufferedWriter, BufferedReader): else: return self.raw.tell() - len(self._read_buf) + def truncate(self, pos=None): + if pos is None: + pos = self.tell() + # Use seek to flush the read buffer. + self.seek(pos) + return BufferedWriter.truncate(self) + def read(self, n=None): if n is None: n = -1 @@ -1145,11 +1189,7 @@ class TextIOBase(IOBase): def truncate(self, pos: int = None) -> int: """Truncate size to pos.""" - self.flush() - if pos is None: - pos = self.tell() - self.seek(pos) - return self.buffer.truncate() + self._unsupported("truncate") def readline(self) -> str: """Read until newline or EOF. @@ -1346,6 +1386,12 @@ class TextIOWrapper(TextIOBase): def seekable(self): return self._seekable + def readable(self): + return self.buffer.readable() + + def writable(self): + return self.buffer.writable() + def flush(self): self.buffer.flush() self._telling = self._seekable @@ -1539,7 +1585,16 @@ class TextIOWrapper(TextIOBase): finally: decoder.setstate(saved_state) + def truncate(self, pos=None): + self.flush() + if pos is None: + pos = self.tell() + self.seek(pos) + return self.buffer.truncate() + def seek(self, cookie, whence=0): + if self.closed: + raise ValueError("tell on closed file") if not self._seekable: raise IOError("underlying stream is not seekable") if whence == 1: # seek relative to current position @@ -1626,6 +1681,8 @@ class TextIOWrapper(TextIOBase): return line def readline(self, limit=None): + if self.closed: + raise ValueError("read from closed file") if limit is None: limit = -1 diff --git a/Lib/test/test_StringIO.py b/Lib/test/test_StringIO.py deleted file mode 100644 index 4f3ce83..0000000 --- a/Lib/test/test_StringIO.py +++ /dev/null @@ -1,127 +0,0 @@ -# Tests StringIO and cStringIO - -import sys -import unittest -import io -from test import test_support - - -class TestGenericStringIO: - # use a class variable CLASS to define which class is being tested - CLASS = None - - # Line of data to test as string - _line = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!' - - # Constructor to use for the test data (._line is passed to this - # constructor) - constructor = str - - def setUp(self): - self._line = self.constructor(self._line) - self._lines = self.constructor((self._line + '\n') * 5) - self._fp = self.CLASS(self._lines) - - def test_reads(self): - eq = self.assertEqual - self.assertRaises(TypeError, self._fp.seek) - eq(self._fp.read(10), self._line[:10]) - eq(self._fp.readline(), self._line[10:] + '\n') - eq(len(self._fp.readlines(60)), 2) - - def test_writes(self): - f = self.CLASS() - self.assertRaises(TypeError, f.seek) - f.write(self._line[:6]) - f.seek(3) - f.write(self._line[20:26]) - f.write(self._line[52]) - self.assertEqual(f.getvalue(), 'abcuvwxyz!') - - def test_writelines(self): - f = self.CLASS() - f.writelines([self._line[0], self._line[1], self._line[2]]) - f.seek(0) - self.assertEqual(f.getvalue(), 'abc') - - def test_writelines_error(self): - def errorGen(): - yield 'a' - raise KeyboardInterrupt() - f = self.CLASS() - self.assertRaises(KeyboardInterrupt, f.writelines, errorGen()) - - def test_truncate(self): - eq = self.assertEqual - f = self.CLASS() - f.write(self._lines) - f.seek(10) - f.truncate() - eq(f.getvalue(), 'abcdefghij') - f.truncate(5) - eq(f.getvalue(), 'abcde') - f.write('xyz') - eq(f.getvalue(), 'abcdexyz') - self.assertRaises(ValueError, f.truncate, -1) - f.close() - self.assertRaises(ValueError, f.write, 'frobnitz') - - def test_closed_flag(self): - f = self.CLASS() - self.assertEqual(f.closed, False) - f.close() - self.assertEqual(f.closed, True) - f = self.CLASS(self.constructor("abc")) - self.assertEqual(f.closed, False) - f.close() - self.assertEqual(f.closed, True) - - def test_isatty(self): - f = self.CLASS() - self.assertRaises(TypeError, f.isatty, None) - self.assertEqual(f.isatty(), False) - f.close() - self.assertRaises(ValueError, f.isatty) - - def test_iterator(self): - eq = self.assertEqual - unless = self.failUnless - eq(iter(self._fp), self._fp) - # Does this object support the iteration protocol? - unless(hasattr(self._fp, '__iter__')) - unless(hasattr(self._fp, '__next__')) - i = 0 - for line in self._fp: - eq(line, self._line + '\n') - i += 1 - eq(i, 5) - self._fp.close() - self.assertRaises(StopIteration, next, self._fp) - -class TestioStringIO(TestGenericStringIO, unittest.TestCase): - CLASS = io.StringIO - - def test_unicode(self): - - if not test_support.have_unicode: return - - # The StringIO module also supports concatenating Unicode - # snippets to larger Unicode strings. This is tested by this - # method. Note that cStringIO does not support this extension. - - f = self.CLASS() - f.write(self._line[:6]) - f.seek(3) - f.write(str(self._line[20:26])) - f.write(str(self._line[52])) - s = f.getvalue() - self.assertEqual(s, str('abcuvwxyz!')) - self.assertEqual(type(s), str) - - -def test_main(): - test_support.run_unittest(TestioStringIO) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index ba3ceeb..fa3a8bb 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -98,7 +98,7 @@ class IOTest(unittest.TestCase): self.assertEqual(f.seek(-1, 2), 13) self.assertEqual(f.tell(), 13) self.assertEqual(f.truncate(12), 12) - self.assertEqual(f.tell(), 13) + self.assertEqual(f.tell(), 12) self.assertRaises(TypeError, f.seek, 0.0) def read_ops(self, f, buffered=False): @@ -143,7 +143,7 @@ class IOTest(unittest.TestCase): self.assertEqual(f.tell(), self.LARGE + 2) self.assertEqual(f.seek(0, 2), self.LARGE + 2) self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) - self.assertEqual(f.tell(), self.LARGE + 2) + self.assertEqual(f.tell(), self.LARGE + 1) self.assertEqual(f.seek(0, 2), self.LARGE + 1) self.assertEqual(f.seek(-1, 2), self.LARGE) self.assertEqual(f.read(2), b"x") @@ -727,6 +727,7 @@ class TextIOWrapperTest(unittest.TestCase): txt.write("BB\nCCC\n") txt.write("X\rY\r\nZ") txt.flush() + self.assertEquals(buf.closed, False) self.assertEquals(buf.getvalue(), expected) def testNewlines(self): @@ -807,7 +808,8 @@ class TextIOWrapperTest(unittest.TestCase): txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) txt.write(data) txt.close() - self.assertEquals(buf.getvalue(), expected) + self.assertEquals(buf.closed, True) + self.assertRaises(ValueError, buf.getvalue) finally: os.linesep = save_linesep diff --git a/Lib/test/test_largefile.py b/Lib/test/test_largefile.py index 4cafe33..0856346 100644 --- a/Lib/test/test_largefile.py +++ b/Lib/test/test_largefile.py @@ -120,14 +120,15 @@ class TestCase(unittest.TestCase): newsize -= 1 f.seek(42) f.truncate(newsize) - self.assertEqual(f.tell(), 42) # else pointer moved - f.seek(0, 2) self.assertEqual(f.tell(), newsize) # else wasn't truncated + f.seek(0, 2) + self.assertEqual(f.tell(), newsize) # XXX truncate(larger than true size) is ill-defined # across platform; cut it waaaaay back f.seek(0) f.truncate(1) - self.assertEqual(f.tell(), 0) # else pointer moved + self.assertEqual(f.tell(), 1) # else pointer moved + f.seek(0) self.assertEqual(len(f.read()), 1) # else wasn't truncated def test_main(): diff --git a/Lib/test/test_memoryio.py b/Lib/test/test_memoryio.py new file mode 100644 index 0000000..0b09ca4 --- /dev/null +++ b/Lib/test/test_memoryio.py @@ -0,0 +1,405 @@ +"""Unit tests for memory-based file-like objects. +StringIO -- for unicode strings +BytesIO -- for bytes +""" + +import unittest +from test import test_support + +import io + +try: + import _bytesio + has_c_implementation = True +except ImportError: + has_c_implementation = False + + +class MemoryTestMixin: + + def write_ops(self, f, t): + self.assertEqual(f.write(t("blah.")), 5) + self.assertEqual(f.seek(0), 0) + self.assertEqual(f.write(t("Hello.")), 6) + self.assertEqual(f.tell(), 6) + self.assertEqual(f.seek(5), 5) + self.assertEqual(f.tell(), 5) + self.assertEqual(f.write(t(" world\n\n\n")), 9) + self.assertEqual(f.seek(0), 0) + self.assertEqual(f.write(t("h")), 1) + self.assertEqual(f.truncate(12), 12) + self.assertEqual(f.tell(), 12) + + def test_write(self): + buf = self.buftype("hello world\n") + memio = self.ioclass(buf) + + self.write_ops(memio, self.buftype) + self.assertEqual(memio.getvalue(), buf) + memio = self.ioclass() + self.write_ops(memio, self.buftype) + self.assertEqual(memio.getvalue(), buf) + self.assertRaises(TypeError, memio.write, None) + memio.close() + self.assertRaises(ValueError, memio.write, self.buftype("")) + + def test_writelines(self): + buf = self.buftype("1234567890") + memio = self.ioclass() + + self.assertEqual(memio.writelines([buf] * 100), None) + self.assertEqual(memio.getvalue(), buf * 100) + memio.writelines([]) + self.assertEqual(memio.getvalue(), buf * 100) + memio = self.ioclass() + self.assertRaises(TypeError, memio.writelines, [buf] + [1]) + self.assertEqual(memio.getvalue(), buf) + self.assertRaises(TypeError, memio.writelines, None) + memio.close() + self.assertRaises(ValueError, memio.writelines, []) + + def test_writelines_error(self): + memio = self.ioclass() + def error_gen(): + yield self.buftype('spam') + raise KeyboardInterrupt + + self.assertRaises(KeyboardInterrupt, memio.writelines, error_gen()) + + def test_truncate(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + + self.assertRaises(ValueError, memio.truncate, -1) + memio.seek(6) + self.assertEqual(memio.truncate(), 6) + self.assertEqual(memio.getvalue(), buf[:6]) + self.assertEqual(memio.truncate(4), 4) + self.assertEqual(memio.getvalue(), buf[:4]) + self.assertEqual(memio.tell(), 4) + memio.write(buf) + self.assertEqual(memio.getvalue(), buf[:4] + buf) + pos = memio.tell() + self.assertEqual(memio.truncate(None), pos) + self.assertEqual(memio.tell(), pos) + self.assertRaises(TypeError, memio.truncate, '0') + memio.close() + self.assertRaises(ValueError, memio.truncate, 0) + + def test_init(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + self.assertEqual(memio.getvalue(), buf) + memio = self.ioclass(None) + self.assertEqual(memio.getvalue(), self.EOF) + memio.__init__(buf * 2) + self.assertEqual(memio.getvalue(), buf * 2) + memio.__init__(buf) + self.assertEqual(memio.getvalue(), buf) + + def test_read(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + + self.assertEqual(memio.read(0), self.EOF) + self.assertEqual(memio.read(1), buf[:1]) + self.assertEqual(memio.read(4), buf[1:5]) + self.assertEqual(memio.read(900), buf[5:]) + self.assertEqual(memio.read(), self.EOF) + memio.seek(0) + self.assertEqual(memio.read(), buf) + self.assertEqual(memio.read(), self.EOF) + self.assertEqual(memio.tell(), 10) + memio.seek(0) + self.assertEqual(memio.read(-1), buf) + memio.seek(0) + self.assertEqual(memio.read(None), buf) + self.assertRaises(TypeError, memio.read, '') + memio.close() + self.assertRaises(ValueError, memio.read) + + def test_readline(self): + buf = self.buftype("1234567890\n") + memio = self.ioclass(buf * 2) + + self.assertEqual(memio.readline(0), self.EOF) + self.assertEqual(memio.readline(), buf) + self.assertEqual(memio.readline(), buf) + self.assertEqual(memio.readline(), self.EOF) + memio.seek(0) + self.assertEqual(memio.readline(5), buf[:5]) + self.assertEqual(memio.readline(5), buf[5:10]) + self.assertEqual(memio.readline(5), buf[10:15]) + memio.seek(0) + self.assertEqual(memio.readline(-1), buf) + memio.seek(0) + self.assertEqual(memio.readline(0), self.EOF) + + buf = self.buftype("1234567890\n") + memio = self.ioclass((buf * 3)[:-1]) + self.assertEqual(memio.readline(), buf) + self.assertEqual(memio.readline(), buf) + self.assertEqual(memio.readline(), buf[:-1]) + self.assertEqual(memio.readline(), self.EOF) + memio.seek(0) + self.assertEqual(memio.readline(None), buf) + self.assertRaises(TypeError, memio.readline, '') + memio.close() + self.assertRaises(ValueError, memio.readline) + + def test_readlines(self): + buf = self.buftype("1234567890\n") + memio = self.ioclass(buf * 10) + + self.assertEqual(memio.readlines(), [buf] * 10) + memio.seek(5) + self.assertEqual(memio.readlines(), [buf[5:]] + [buf] * 9) + memio.seek(0) + self.assertEqual(memio.readlines(15), [buf] * 2) + memio.seek(0) + self.assertEqual(memio.readlines(-1), [buf] * 10) + memio.seek(0) + self.assertEqual(memio.readlines(0), [buf] * 10) + memio.seek(0) + self.assertEqual(memio.readlines(None), [buf] * 10) + self.assertRaises(TypeError, memio.readlines, '') + memio.close() + self.assertRaises(ValueError, memio.readlines) + + def test_iterator(self): + buf = self.buftype("1234567890\n") + memio = self.ioclass(buf * 10) + + self.assertEqual(iter(memio), memio) + self.failUnless(hasattr(memio, '__iter__')) + self.failUnless(hasattr(memio, '__next__')) + i = 0 + for line in memio: + self.assertEqual(line, buf) + i += 1 + self.assertEqual(i, 10) + memio.seek(0) + i = 0 + for line in memio: + self.assertEqual(line, buf) + i += 1 + self.assertEqual(i, 10) + memio = self.ioclass(buf * 2) + memio.close() + self.assertRaises(ValueError, memio.__next__) + + def test_getvalue(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + + self.assertEqual(memio.getvalue(), buf) + memio.read() + self.assertEqual(memio.getvalue(), buf) + memio = self.ioclass(buf * 1000) + self.assertEqual(memio.getvalue()[-3:], self.buftype("890")) + memio = self.ioclass(buf) + memio.close() + self.assertRaises(ValueError, memio.getvalue) + + def test_seek(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + + memio.read(5) + self.assertRaises(ValueError, memio.seek, -1) + self.assertRaises(ValueError, memio.seek, 1, -1) + self.assertRaises(ValueError, memio.seek, 1, 3) + self.assertEqual(memio.seek(0), 0) + self.assertEqual(memio.seek(0, 0), 0) + self.assertEqual(memio.read(), buf) + self.assertEqual(memio.seek(3), 3) + self.assertEqual(memio.seek(0, 1), 3) + self.assertEqual(memio.read(), buf[3:]) + self.assertEqual(memio.seek(len(buf)), len(buf)) + self.assertEqual(memio.read(), self.EOF) + memio.seek(len(buf) + 1) + self.assertEqual(memio.read(), self.EOF) + self.assertEqual(memio.seek(0, 2), len(buf)) + self.assertEqual(memio.read(), self.EOF) + memio.close() + self.assertRaises(ValueError, memio.seek, 0) + + def test_overseek(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + + self.assertEqual(memio.seek(len(buf) + 1), 11) + self.assertEqual(memio.read(), self.EOF) + self.assertEqual(memio.tell(), 11) + self.assertEqual(memio.getvalue(), buf) + memio.write(self.EOF) + self.assertEqual(memio.getvalue(), buf) + memio.write(buf) + self.assertEqual(memio.getvalue(), buf + self.buftype('\0') + buf) + + def test_tell(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + + self.assertEqual(memio.tell(), 0) + memio.seek(5) + self.assertEqual(memio.tell(), 5) + memio.seek(10000) + self.assertEqual(memio.tell(), 10000) + memio.close() + self.assertRaises(ValueError, memio.tell) + + def test_flush(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + + self.assertEqual(memio.flush(), None) + + def test_flags(self): + memio = self.ioclass() + + self.assertEqual(memio.writable(), True) + self.assertEqual(memio.readable(), True) + self.assertEqual(memio.seekable(), True) + self.assertEqual(memio.isatty(), False) + self.assertEqual(memio.closed, False) + memio.close() + self.assertEqual(memio.writable(), True) + self.assertEqual(memio.readable(), True) + self.assertEqual(memio.seekable(), True) + self.assertRaises(ValueError, memio.isatty) + self.assertEqual(memio.closed, True) + + def test_subclassing(self): + buf = self.buftype("1234567890") + def test1(): + class MemIO(self.ioclass): + pass + m = MemIO(buf) + return m.getvalue() + def test2(): + class MemIO(self.ioclass): + def __init__(me, a, b): + self.ioclass.__init__(me, a) + m = MemIO(buf, None) + return m.getvalue() + self.assertEqual(test1(), buf) + self.assertEqual(test2(), buf) + + +class PyBytesIOTest(MemoryTestMixin, unittest.TestCase): + @staticmethod + def buftype(s): + return s.encode("ascii") + ioclass = io._BytesIO + EOF = b"" + + def test_read1(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + + self.assertRaises(TypeError, memio.read1) + self.assertEqual(memio.read(), buf) + + def test_readinto(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + + b = bytearray(b"hello") + self.assertEqual(memio.readinto(b), 5) + self.assertEqual(b, b"12345") + self.assertEqual(memio.readinto(b), 5) + self.assertEqual(b, b"67890") + self.assertEqual(memio.readinto(b), 0) + self.assertEqual(b, b"67890") + b = bytearray(b"hello world") + memio.seek(0) + self.assertEqual(memio.readinto(b), 10) + self.assertEqual(b, b"1234567890d") + b = bytearray(b"") + memio.seek(0) + self.assertEqual(memio.readinto(b), 0) + self.assertEqual(b, b"") + self.assertRaises(TypeError, memio.readinto, '') + import array + a = array.array('b', b"hello world") + memio = self.ioclass(buf) + memio.readinto(a) + self.assertEqual(a.tostring(), b"1234567890d") + memio.close() + self.assertRaises(ValueError, memio.readinto, b) + + def test_relative_seek(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + + self.assertEqual(memio.seek(-1, 1), 0) + self.assertEqual(memio.seek(3, 1), 3) + self.assertEqual(memio.seek(-4, 1), 0) + self.assertEqual(memio.seek(-1, 2), 9) + self.assertEqual(memio.seek(1, 1), 10) + self.assertEqual(memio.seek(1, 2), 11) + memio.seek(-3, 2) + self.assertEqual(memio.read(), buf[-3:]) + memio.seek(0) + memio.seek(1, 1) + self.assertEqual(memio.read(), buf[1:]) + + def test_unicode(self): + memio = self.ioclass() + + self.assertRaises(TypeError, self.ioclass, "1234567890") + self.assertRaises(TypeError, memio.write, "1234567890") + self.assertRaises(TypeError, memio.writelines, ["1234567890"]) + + def test_bytes_array(self): + buf = b"1234567890" + import array + a = array.array('b', list(buf)) + memio = self.ioclass(a) + self.assertEqual(memio.getvalue(), buf) + self.assertEqual(memio.write(a), 10) + self.assertEqual(memio.getvalue(), buf) + + +class PyStringIOTest(MemoryTestMixin, unittest.TestCase): + buftype = str + ioclass = io.StringIO + EOF = "" + + def test_relative_seek(self): + memio = self.ioclass() + + self.assertRaises(IOError, memio.seek, -1, 1) + self.assertRaises(IOError, memio.seek, 3, 1) + self.assertRaises(IOError, memio.seek, -3, 1) + self.assertRaises(IOError, memio.seek, -1, 2) + self.assertRaises(IOError, memio.seek, 1, 1) + self.assertRaises(IOError, memio.seek, 1, 2) + + # XXX: For the Python version of io.StringIO, this is highly + # dependent on the encoding used for the underlying buffer. + # def test_widechar(self): + # buf = self.buftype("\U0002030a\U00020347") + # memio = self.ioclass(buf) + # + # self.assertEqual(memio.getvalue(), buf) + # self.assertEqual(memio.write(buf), len(buf)) + # self.assertEqual(memio.tell(), len(buf)) + # self.assertEqual(memio.getvalue(), buf) + # self.assertEqual(memio.write(buf), len(buf)) + # self.assertEqual(memio.tell(), len(buf) * 2) + # self.assertEqual(memio.getvalue(), buf + buf) + +if has_c_implementation: + class CBytesIOTest(PyBytesIOTest): + ioclass = io.BytesIO + +def test_main(): + tests = [PyBytesIOTest, PyStringIOTest] + if has_c_implementation: + tests.extend([CBytesIOTest]) + test_support.run_unittest(*tests) + +if __name__ == '__main__': + test_main() diff --git a/Lib/test/test_mimetools.py b/Lib/test/test_mimetools.py index cf0e191..ef333f5 100644 --- a/Lib/test/test_mimetools.py +++ b/Lib/test/test_mimetools.py @@ -58,7 +58,7 @@ class MimeToolsTest(unittest.TestCase): s.add(nb) def test_message(self): - msg = mimetools.Message(io.StringIO(msgtext1)) + msg = mimetools.Message(io.StringIO(str(msgtext1))) self.assertEqual(msg.gettype(), "text/plain") self.assertEqual(msg.getmaintype(), "text") self.assertEqual(msg.getsubtype(), "plain") |