from collections import namedtuple import csv import re import textwrap from . import NOT_SET, strutil, fsutil EMPTY = '-' UNKNOWN = '???' def parse_markers(markers, default=None): if markers is NOT_SET: return default if not markers: return None if type(markers) is not str: return markers if markers == markers[0] * len(markers): return [markers] return list(markers) def fix_row(row, **markers): if isinstance(row, str): raise NotImplementedError(row) empty = parse_markers(markers.pop('empty', ('-',))) unknown = parse_markers(markers.pop('unknown', ('???',))) row = (val if val else None for val in row) if not empty: if unknown: row = (UNKNOWN if val in unknown else val for val in row) elif not unknown: row = (EMPTY if val in empty else val for val in row) else: row = (EMPTY if val in empty else (UNKNOWN if val in unknown else val) for val in row) return tuple(row) def _fix_read_default(row): for value in row: yield value.strip() def _fix_write_default(row, empty=''): for value in row: yield empty if value is None else str(value) def _normalize_fix_read(fix): if fix is None: fix = '' if callable(fix): def fix_row(row): values = fix(row) return _fix_read_default(values) elif isinstance(fix, str): def fix_row(row): values = _fix_read_default(row) return (None if v == fix else v for v in values) else: raise NotImplementedError(fix) return fix_row def _normalize_fix_write(fix, empty=''): if fix is None: fix = empty if callable(fix): def fix_row(row): values = fix(row) return _fix_write_default(values, empty) elif isinstance(fix, str): def fix_row(row): return _fix_write_default(row, fix) else: raise NotImplementedError(fix) return fix_row def read_table(infile, header, *, sep='\t', fix=None, _open=open, _get_reader=csv.reader, ): """Yield each row of the given ???-separated (e.g. tab) file.""" if isinstance(infile, str): with _open(infile, newline='') as infile: yield from read_table( infile, header, sep=sep, fix=fix, _open=_open, _get_reader=_get_reader, ) return lines = strutil._iter_significant_lines(infile) # Validate the header. if not isinstance(header, str): header = sep.join(header) try: actualheader = next(lines).strip() except StopIteration: actualheader = '' if actualheader != header: raise ValueError(f'bad header {actualheader!r}') fix_row = _normalize_fix_read(fix) for row in _get_reader(lines, delimiter=sep or '\t'): yield tuple(fix_row(row)) def write_table(outfile, header, rows, *, sep='\t', fix=None, backup=True, _open=open, _get_writer=csv.writer, ): """Write each of the rows to the given ???-separated (e.g. tab) file.""" if backup: fsutil.create_backup(outfile, backup) if isinstance(outfile, str): with _open(outfile, 'w', newline='') as outfile: return write_table( outfile, header, rows, sep=sep, fix=fix, backup=backup, _open=_open, _get_writer=_get_writer, ) if isinstance(header, str): header = header.split(sep or '\t') fix_row = _normalize_fix_write(fix) writer = _get_writer(outfile, delimiter=sep or '\t') writer.writerow(header) for row in rows: writer.writerow( tuple(fix_row(row)) ) def parse_table(entries, sep, header=None, rawsep=None, *, default=NOT_SET, strict=True, ): header, sep = _normalize_table_file_props(header, sep) if not sep: raise ValueError('missing "sep"') ncols = None if header: if strict: ncols = len(header.split(sep)) cur_file = None for line, filename in strutil.parse_entries(entries, ignoresep=sep): _sep = sep if filename: if header and cur_file != filename: cur_file = filename # Skip the first line if it's the header. if line.strip() == header: continue else: # We expected the header. raise NotImplementedError((header, line)) elif rawsep and sep not in line: _sep = rawsep row = _parse_row(line, _sep, ncols, default) if strict and not ncols: ncols = len(row) yield row, filename def parse_row(line, sep, *, ncols=None, default=NOT_SET): if not sep: raise ValueError('missing "sep"') return _parse_row(line, sep, ncols, default) def _parse_row(line, sep, ncols, default): row = tuple(v.strip() for v in line.split(sep)) if (ncols or 0) > 0: diff = ncols - len(row) if diff: if default is NOT_SET or diff < 0: raise Exception(f'bad row (expected {ncols} columns, got {row!r})') row += (default,) * diff return row def _normalize_table_file_props(header, sep): if not header: return None, sep if not isinstance(header, str): if not sep: raise NotImplementedError(header) header = sep.join(header) elif not sep: for sep in ('\t', ',', ' '): if sep in header: break else: sep = None return header, sep ################################## # stdout tables WIDTH = 20 def resolve_columns(specs): if isinstance(specs, str): specs = specs.replace(',', ' ').strip().split() resolved = [] for raw in specs: column = ColumnSpec.from_raw(raw) resolved.append(column) return resolved def build_table(specs, *, sep=' ', defaultwidth=None): columns = resolve_columns(specs) return _build_table(columns, sep=sep, defaultwidth=defaultwidth) class ColumnSpec(namedtuple('ColumnSpec', 'field label fmt')): REGEX = re.compile(textwrap.dedent(r''' ^ (?: \[ ( (?: [^\s\]] [^\]]* )? [^\s\]] ) #