summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_csv.py
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2007-08-06 19:32:18 (GMT)
committerGuido van Rossum <guido@python.org>2007-08-06 19:32:18 (GMT)
commit4626458cc57b73dc01edf4987bbc3fe1d4437e59 (patch)
tree4bfbbbeb5cae21f552709414a1a345fe5ecd544d /Lib/test/test_csv.py
parentdd766d53a2feebf1eef3d27c7192d0adc38d2fdd (diff)
downloadcpython-4626458cc57b73dc01edf4987bbc3fe1d4437e59.zip
cpython-4626458cc57b73dc01edf4987bbc3fe1d4437e59.tar.gz
cpython-4626458cc57b73dc01edf4987bbc3fe1d4437e59.tar.bz2
SF patch# 1767398 by Adam Hupp.
Fix csv to read/write bytes from/to binary files. Fix the unit tests to test this and to use with TemporaryFile().
Diffstat (limited to 'Lib/test/test_csv.py')
-rw-r--r--Lib/test/test_csv.py249
1 files changed, 75 insertions, 174 deletions
diff --git a/Lib/test/test_csv.py b/Lib/test/test_csv.py
index e623a02..6098285 100644
--- a/Lib/test/test_csv.py
+++ b/Lib/test/test_csv.py
@@ -6,7 +6,7 @@ import sys
import os
import unittest
from StringIO import StringIO
-import tempfile
+from tempfile import TemporaryFile
import csv
import gc
from test import test_support
@@ -117,17 +117,12 @@ class Test_Csv(unittest.TestCase):
def _write_test(self, fields, expect, **kwargs):
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+b") as fileobj:
writer = csv.writer(fileobj, **kwargs)
writer.writerow(fields)
fileobj.seek(0)
- self.assertEqual(fileobj.read(),
+ self.assertEqual(str(fileobj.read()),
expect + writer.dialect.lineterminator)
- finally:
- fileobj.close()
- os.unlink(name)
def test_write_arg_valid(self):
self.assertRaises(csv.Error, self._write_test, None, '')
@@ -192,17 +187,13 @@ class Test_Csv(unittest.TestCase):
raise IOError
writer = csv.writer(BrokenFile())
self.assertRaises(IOError, writer.writerows, [['a']])
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+
+ with TemporaryFile("w+b") as fileobj:
writer = csv.writer(fileobj)
self.assertRaises(TypeError, writer.writerows, None)
writer.writerows([['a','b'],['c','d']])
fileobj.seek(0)
- self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
- finally:
- fileobj.close()
- os.unlink(name)
+ self.assertEqual(fileobj.read(), b"a,b\r\nc,d\r\n")
def _read_test(self, input, expect, **kwargs):
reader = csv.reader(input, **kwargs)
@@ -333,17 +324,19 @@ class TestDialectRegistry(unittest.TestCase):
quoting = csv.QUOTE_NONE
escapechar = "\\"
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+") as fileobj:
fileobj.write("abc def\nc1ccccc1 benzene\n")
fileobj.seek(0)
reader = csv.reader(fileobj, dialect=space())
self.assertEqual(next(reader), ["abc", "def"])
self.assertEqual(next(reader), ["c1ccccc1", "benzene"])
- finally:
- fileobj.close()
- os.unlink(name)
+
+ def compare_dialect_123(self, expected, *writeargs, **kwwriteargs):
+ with TemporaryFile("w+b") as fileobj:
+ writer = csv.writer(fileobj, *writeargs, **kwwriteargs)
+ writer.writerow([1,2,3])
+ fileobj.seek(0)
+ self.assertEqual(str(fileobj.read()), expected)
def test_dialect_apply(self):
class testA(csv.excel):
@@ -352,63 +345,19 @@ class TestDialectRegistry(unittest.TestCase):
delimiter = ":"
class testC(csv.excel):
delimiter = "|"
+ class testUni(csv.excel):
+ delimiter = "\u039B"
csv.register_dialect('testC', testC)
try:
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
- writer = csv.writer(fileobj)
- writer.writerow([1,2,3])
- fileobj.seek(0)
- self.assertEqual(fileobj.read(), "1,2,3\r\n")
- finally:
- fileobj.close()
- os.unlink(name)
-
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
- writer = csv.writer(fileobj, testA)
- writer.writerow([1,2,3])
- fileobj.seek(0)
- self.assertEqual(fileobj.read(), "1\t2\t3\r\n")
- finally:
- fileobj.close()
- os.unlink(name)
-
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
- writer = csv.writer(fileobj, dialect=testB())
- writer.writerow([1,2,3])
- fileobj.seek(0)
- self.assertEqual(fileobj.read(), "1:2:3\r\n")
- finally:
- fileobj.close()
- os.unlink(name)
-
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
- writer = csv.writer(fileobj, dialect='testC')
- writer.writerow([1,2,3])
- fileobj.seek(0)
- self.assertEqual(fileobj.read(), "1|2|3\r\n")
- finally:
- fileobj.close()
- os.unlink(name)
-
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
- writer = csv.writer(fileobj, dialect=testA, delimiter=';')
- writer.writerow([1,2,3])
- fileobj.seek(0)
- self.assertEqual(fileobj.read(), "1;2;3\r\n")
- finally:
- fileobj.close()
- os.unlink(name)
+ self.compare_dialect_123("1,2,3\r\n")
+ self.compare_dialect_123("1\t2\t3\r\n", testA)
+ self.compare_dialect_123("1:2:3\r\n", dialect=testB())
+ self.compare_dialect_123("1|2|3\r\n", dialect='testC')
+ self.compare_dialect_123("1;2;3\r\n", dialect=testA,
+ delimiter=';')
+ self.compare_dialect_123("1\u039B2\u039B3\r\n",
+ dialect=testUni)
finally:
csv.unregister_dialect('testC')
@@ -423,29 +372,19 @@ class TestDialectRegistry(unittest.TestCase):
class TestCsvBase(unittest.TestCase):
def readerAssertEqual(self, input, expected_result):
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+") as fileobj:
fileobj.write(input)
fileobj.seek(0)
reader = csv.reader(fileobj, dialect = self.dialect)
fields = list(reader)
self.assertEqual(fields, expected_result)
- finally:
- fileobj.close()
- os.unlink(name)
def writerAssertEqual(self, input, expected_result):
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+b") as fileobj:
writer = csv.writer(fileobj, dialect = self.dialect)
writer.writerows(input)
fileobj.seek(0)
- self.assertEqual(fileobj.read(), expected_result)
- finally:
- fileobj.close()
- os.unlink(name)
+ self.assertEqual(str(fileobj.read()), expected_result)
class TestDialectExcel(TestCsvBase):
dialect = 'excel'
@@ -574,91 +513,59 @@ class TestDictFields(unittest.TestCase):
### "long" means the row is longer than the number of fieldnames
### "short" means there are fewer elements in the row than fieldnames
def test_write_simple_dict(self):
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+b") as fileobj:
writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
writer.writerow({"f1": 10, "f3": "abc"})
fileobj.seek(0)
- self.assertEqual(fileobj.read(), "10,,abc\r\n")
- finally:
- fileobj.close()
- os.unlink(name)
+ self.assertEqual(str(fileobj.read()), "10,,abc\r\n")
def test_write_no_fields(self):
fileobj = StringIO()
self.assertRaises(TypeError, csv.DictWriter, fileobj)
def test_read_dict_fields(self):
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+") as fileobj:
fileobj.write("1,2,abc\r\n")
fileobj.seek(0)
reader = csv.DictReader(fileobj,
fieldnames=["f1", "f2", "f3"])
self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'})
- finally:
- fileobj.close()
- os.unlink(name)
def test_read_dict_no_fieldnames(self):
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+") as fileobj:
fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
fileobj.seek(0)
reader = csv.DictReader(fileobj)
self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'})
- finally:
- fileobj.close()
- os.unlink(name)
def test_read_long(self):
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+") as fileobj:
fileobj.write("1,2,abc,4,5,6\r\n")
fileobj.seek(0)
reader = csv.DictReader(fileobj,
fieldnames=["f1", "f2"])
self.assertEqual(next(reader), {"f1": '1', "f2": '2',
None: ["abc", "4", "5", "6"]})
- finally:
- fileobj.close()
- os.unlink(name)
def test_read_long_with_rest(self):
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+") as fileobj:
fileobj.write("1,2,abc,4,5,6\r\n")
fileobj.seek(0)
reader = csv.DictReader(fileobj,
fieldnames=["f1", "f2"], restkey="_rest")
self.assertEqual(next(reader), {"f1": '1', "f2": '2',
"_rest": ["abc", "4", "5", "6"]})
- finally:
- fileobj.close()
- os.unlink(name)
def test_read_long_with_rest_no_fieldnames(self):
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+") as fileobj:
fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n")
fileobj.seek(0)
reader = csv.DictReader(fileobj, restkey="_rest")
self.assertEqual(next(reader), {"f1": '1', "f2": '2',
"_rest": ["abc", "4", "5", "6"]})
- finally:
- fileobj.close()
- os.unlink(name)
def test_read_short(self):
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+") as fileobj:
fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")
fileobj.seek(0)
reader = csv.DictReader(fileobj,
@@ -669,9 +576,6 @@ class TestDictFields(unittest.TestCase):
self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
"4": 'DEFAULT', "5": 'DEFAULT',
"6": 'DEFAULT'})
- finally:
- fileobj.close()
- os.unlink(name)
def test_read_multi(self):
sample = [
@@ -710,64 +614,45 @@ class TestArrayWrites(unittest.TestCase):
contents = [(20-i) for i in range(20)]
a = array.array('i', contents)
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+b") as fileobj:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join([str(i) for i in a])+"\r\n"
fileobj.seek(0)
- self.assertEqual(fileobj.read(), expected)
- finally:
- fileobj.close()
- os.unlink(name)
+ self.assertEqual(str(fileobj.read()), expected)
def test_double_write(self):
import array
contents = [(20-i)*0.1 for i in range(20)]
a = array.array('d', contents)
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+b") as fileobj:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join([str(i) for i in a])+"\r\n"
fileobj.seek(0)
- self.assertEqual(fileobj.read(), expected)
- finally:
- fileobj.close()
- os.unlink(name)
+ self.assertEqual(str(fileobj.read()), expected)
def test_float_write(self):
import array
contents = [(20-i)*0.1 for i in range(20)]
a = array.array('f', contents)
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ with TemporaryFile("w+b") as fileobj:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join([str(i) for i in a])+"\r\n"
fileobj.seek(0)
- self.assertEqual(fileobj.read(), expected)
- finally:
- fileobj.close()
- os.unlink(name)
+ self.assertEqual(str(fileobj.read()), expected)
def test_char_write(self):
import array, string
- a = array.array('c', string.letters)
- fd, name = tempfile.mkstemp()
- fileobj = os.fdopen(fd, "w+b")
- try:
+ a = array.array('u', string.letters)
+
+ with TemporaryFile("w+b") as fileobj:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join(a)+"\r\n"
fileobj.seek(0)
- self.assertEqual(fileobj.read(), expected)
- finally:
- fileobj.close()
- os.unlink(name)
+ self.assertEqual(str(fileobj.read()), expected)
class TestDialectValidity(unittest.TestCase):
def test_quoting(self):
@@ -970,20 +855,36 @@ else:
# if writer leaks during write, last delta should be 5 or more
self.assertEqual(delta < 5, True)
-# commented out for now - csv module doesn't yet support Unicode
-## class TestUnicode(unittest.TestCase):
-## def test_unicode_read(self):
-## import codecs
-## f = codecs.EncodedFile(StringIO("Martin von Löwis,"
-## "Marc André Lemburg,"
-## "Guido van Rossum,"
-## "François Pinard\r\n"),
-## data_encoding='iso-8859-1')
-## reader = csv.reader(f)
-## self.assertEqual(list(reader), [["Martin von Löwis",
-## "Marc André Lemburg",
-## "Guido van Rossum",
-## "François Pinardn"]])
+class TestUnicode(unittest.TestCase):
+
+ names = ["Martin von Löwis",
+ "Marc André Lemburg",
+ "Guido van Rossum",
+ "François Pinard"]
+
+ def test_unicode_read(self):
+ import io
+ fileobj = io.TextIOWrapper(TemporaryFile("w+b"), encoding="utf-16")
+ with fileobj as fileobj:
+ fileobj.write(",".join(self.names) + "\r\n")
+
+ fileobj.seek(0)
+ reader = csv.reader(fileobj)
+ self.assertEqual(list(reader), [self.names])
+
+
+ def test_unicode_write(self):
+ import io
+ with TemporaryFile("w+b") as fileobj:
+ encwriter = io.TextIOWrapper(fileobj, encoding="utf-8")
+ writer = csv.writer(encwriter)
+ writer.writerow(self.names)
+ expected = ",".join(self.names)+"\r\n"
+ fileobj.seek(0)
+ self.assertEqual(str(fileobj.read()), expected)
+
+
+
def test_main():
mod = sys.modules[__name__]