1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
from _csv import Error, __version__, writer, reader, register_dialect, \
unregister_dialect, get_dialect, list_dialects, \
QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \
__doc__
__all__ = [ "QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE",
"Error", "Dialect", "excel", "excel_tab", "reader", "writer",
"register_dialect", "get_dialect", "list_dialects",
"unregister_dialect", "__version__", "DictReader", "DictWriter" ]
class Dialect:
_name = ""
_valid = False
# placeholders
delimiter = None
quotechar = None
escapechar = None
doublequote = None
skipinitialspace = None
lineterminator = None
quoting = None
def __init__(self):
if self.__class__ != Dialect:
self._valid = True
errors = self._validate()
if errors != []:
raise Error, "Dialect did not validate: %s" % ", ".join(errors)
def _validate(self):
errors = []
if not self._valid:
errors.append("can't directly instantiate Dialect class")
if self.delimiter is None:
errors.append("delimiter character not set")
elif (not isinstance(self.delimiter, str) or
len(self.delimiter) > 1):
errors.append("delimiter must be one-character string")
if self.quotechar is None:
if self.quoting != QUOTE_NONE:
errors.append("quotechar not set")
elif (not isinstance(self.quotechar, str) or
len(self.quotechar) > 1):
errors.append("quotechar must be one-character string")
if self.lineterminator is None:
errors.append("lineterminator not set")
elif not isinstance(self.lineterminator, str):
errors.append("lineterminator must be a string")
if self.doublequote not in (True, False):
errors.append("doublequote parameter must be True or False")
if self.skipinitialspace not in (True, False):
errors.append("skipinitialspace parameter must be True or False")
if self.quoting is None:
errors.append("quoting parameter not set")
if self.quoting is QUOTE_NONE:
if (not isinstance(self.escapechar, (unicode, str)) or
len(self.escapechar) > 1):
errors.append("escapechar must be a one-character string or unicode object")
return errors
class excel(Dialect):
delimiter = ','
quotechar = '"'
doublequote = True
skipinitialspace = False
lineterminator = '\r\n'
quoting = QUOTE_MINIMAL
register_dialect("excel", excel)
class excel_tab(excel):
delimiter = '\t'
register_dialect("excel-tab", excel_tab)
class DictReader:
def __init__(self, f, fieldnames, restkey=None, restval=None,
dialect="excel", *args):
self.fieldnames = fieldnames # list of keys for the dict
self.restkey = restkey # key to catch long rows
self.restval = restval # default value for short rows
self.reader = reader(f, dialect, *args)
def __iter__(self):
return self
def next(self):
row = self.reader.next()
# unlike the basic reader, we prefer not to return blanks,
# because we will typically wind up with a dict full of None
# values
while row == []:
row = self.reader.next()
d = dict(zip(self.fieldnames, row))
lf = len(self.fieldnames)
lr = len(row)
if lf < lr:
d[self.restkey] = row[lf:]
elif lf > lr:
for key in self.fieldnames[lr:]:
d[key] = self.restval
return d
class DictWriter:
def __init__(self, f, fieldnames, restval="", extrasaction="raise",
dialect="excel", *args):
self.fieldnames = fieldnames # list of keys for the dict
self.restval = restval # for writing short dicts
if extrasaction.lower() not in ("raise", "ignore"):
raise ValueError, \
("extrasaction (%s) must be 'raise' or 'ignore'" %
extrasaction)
self.extrasaction = extrasaction
self.writer = writer(f, dialect, *args)
def _dict_to_list(self, rowdict):
if self.extrasaction == "raise":
for k in rowdict.keys():
if k not in self.fieldnames:
raise ValueError, "dict contains fields not in fieldnames"
return [rowdict.get(key, self.restval) for key in self.fieldnames]
def writerow(self, rowdict):
return self.writer.writerow(self._dict_to_list(rowdict))
def writerows(self, rowdicts):
rows = []
for rowdict in rowdicts:
rows.append(self._dict_to_list(rowdict))
return self.writer.writerows(rows)
|