diff options
Diffstat (limited to 'Tools/c-analyzer/c_common')
-rw-r--r-- | Tools/c-analyzer/c_common/__init__.py | 2 | ||||
-rw-r--r-- | Tools/c-analyzer/c_common/clsutil.py | 117 | ||||
-rw-r--r-- | Tools/c-analyzer/c_common/fsutil.py | 388 | ||||
-rw-r--r-- | Tools/c-analyzer/c_common/info.py | 0 | ||||
-rw-r--r-- | Tools/c-analyzer/c_common/iterutil.py | 48 | ||||
-rw-r--r-- | Tools/c-analyzer/c_common/logging.py | 63 | ||||
-rw-r--r-- | Tools/c-analyzer/c_common/misc.py | 7 | ||||
-rw-r--r-- | Tools/c-analyzer/c_common/scriptutil.py | 577 | ||||
-rw-r--r-- | Tools/c-analyzer/c_common/show.py | 0 | ||||
-rw-r--r-- | Tools/c-analyzer/c_common/strutil.py | 42 | ||||
-rw-r--r-- | Tools/c-analyzer/c_common/tables.py | 213 |
11 files changed, 1457 insertions, 0 deletions
diff --git a/Tools/c-analyzer/c_common/__init__.py b/Tools/c-analyzer/c_common/__init__.py new file mode 100644 index 0000000..a4c3bb2 --- /dev/null +++ b/Tools/c-analyzer/c_common/__init__.py @@ -0,0 +1,2 @@ + +NOT_SET = object() diff --git a/Tools/c-analyzer/c_common/clsutil.py b/Tools/c-analyzer/c_common/clsutil.py new file mode 100644 index 0000000..aa5f6b9 --- /dev/null +++ b/Tools/c-analyzer/c_common/clsutil.py @@ -0,0 +1,117 @@ + +_NOT_SET = object() + + +class Slot: + """A descriptor that provides a slot. + + This is useful for types that can't have slots via __slots__, + e.g. tuple subclasses. + """ + + __slots__ = ('initial', 'default', 'readonly', 'instances', 'name') + + def __init__(self, initial=_NOT_SET, *, + default=_NOT_SET, + readonly=False, + ): + self.initial = initial + self.default = default + self.readonly = readonly + + # The instance cache is not inherently tied to the normal + # lifetime of the instances. So must do something in order to + # avoid keeping the instances alive by holding a reference here. + # Ideally we would use weakref.WeakValueDictionary to do this. + # However, most builtin types do not support weakrefs. So + # instead we monkey-patch __del__ on the attached class to clear + # the instance. + self.instances = {} + self.name = None + + def __set_name__(self, cls, name): + if self.name is not None: + raise TypeError('already used') + self.name = name + try: + slotnames = cls.__slot_names__ + except AttributeError: + slotnames = cls.__slot_names__ = [] + slotnames.append(name) + self._ensure___del__(cls, slotnames) + + def __get__(self, obj, cls): + if obj is None: # called on the class + return self + try: + value = self.instances[id(obj)] + except KeyError: + if self.initial is _NOT_SET: + value = self.default + else: + value = self.initial + self.instances[id(obj)] = value + if value is _NOT_SET: + raise AttributeError(self.name) + # XXX Optionally make a copy? + return value + + def __set__(self, obj, value): + if self.readonly: + raise AttributeError(f'{self.name} is readonly') + # XXX Optionally coerce? + self.instances[id(obj)] = value + + def __delete__(self, obj): + if self.readonly: + raise AttributeError(f'{self.name} is readonly') + self.instances[id(obj)] = self.default # XXX refleak? + + def _ensure___del__(self, cls, slotnames): # See the comment in __init__(). + try: + old___del__ = cls.__del__ + except AttributeError: + old___del__ = (lambda s: None) + else: + if getattr(old___del__, '_slotted', False): + return + + def __del__(_self): + for name in slotnames: + delattr(_self, name) + old___del__(_self) + __del__._slotted = True + cls.__del__ = __del__ + + def set(self, obj, value): + """Update the cached value for an object. + + This works even if the descriptor is read-only. This is + particularly useful when initializing the object (e.g. in + its __new__ or __init__). + """ + self.instances[id(obj)] = value + + +class classonly: + """A non-data descriptor that makes a value only visible on the class. + + This is like the "classmethod" builtin, but does not show up on + instances of the class. It may be used as a decorator. + """ + + def __init__(self, value): + self.value = value + self.getter = classmethod(value).__get__ + self.name = None + + def __set_name__(self, cls, name): + if self.name is not None: + raise TypeError('already used') + self.name = name + + def __get__(self, obj, cls): + if obj is not None: + raise AttributeError(self.name) + # called on the class + return self.getter(None, cls) diff --git a/Tools/c-analyzer/c_common/fsutil.py b/Tools/c-analyzer/c_common/fsutil.py new file mode 100644 index 0000000..56023f3 --- /dev/null +++ b/Tools/c-analyzer/c_common/fsutil.py @@ -0,0 +1,388 @@ +import fnmatch +import glob +import os +import os.path +import shutil +import stat + +from .iterutil import iter_many + + +C_SOURCE_SUFFIXES = ('.c', '.h') + + +def create_backup(old, backup=None): + if isinstance(old, str): + filename = old + else: + filename = getattr(old, 'name', None) + if not filename: + return None + if not backup or backup is True: + backup = f'{filename}.bak' + try: + shutil.copyfile(filename, backup) + except FileNotFoundError as exc: + if exc.filename != filename: + raise # re-raise + backup = None + return backup + + +################################## +# find files + +def match_glob(filename, pattern): + if fnmatch.fnmatch(filename, pattern): + return True + + # fnmatch doesn't handle ** quite right. It will not match the + # following: + # + # ('x/spam.py', 'x/**/*.py') + # ('spam.py', '**/*.py') + # + # though it *will* match the following: + # + # ('x/y/spam.py', 'x/**/*.py') + # ('x/spam.py', '**/*.py') + + if '**/' not in pattern: + return False + + # We only accommodate the single-"**" case. + return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1)) + + +def iter_filenames(filenames, *, + start=None, + include=None, + exclude=None, + ): + onempty = Exception('no filenames provided') + for filename, solo in iter_many(filenames, onempty): + check, start = _get_check(filename, start, include, exclude) + yield filename, check, solo +# filenames = iter(filenames or ()) +# try: +# first = next(filenames) +# except StopIteration: +# raise Exception('no filenames provided') +# try: +# second = next(filenames) +# except StopIteration: +# check, _ = _get_check(first, start, include, exclude) +# yield first, check, False +# return +# +# check, start = _get_check(first, start, include, exclude) +# yield first, check, True +# check, start = _get_check(second, start, include, exclude) +# yield second, check, True +# for filename in filenames: +# check, start = _get_check(filename, start, include, exclude) +# yield filename, check, True + + +def expand_filenames(filenames): + for filename in filenames: + # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)? + if '**/' in filename: + yield from glob.glob(filename.replace('**/', '')) + yield from glob.glob(filename) + + +def _get_check(filename, start, include, exclude): + if start and filename != start: + return (lambda: '<skipped>'), start + else: + def check(): + if _is_excluded(filename, exclude, include): + return '<excluded>' + return None + return check, None + + +def _is_excluded(filename, exclude, include): + if include: + for included in include: + if match_glob(filename, included): + return False + return True + elif exclude: + for excluded in exclude: + if match_glob(filename, excluded): + return True + return False + else: + return False + + +def _walk_tree(root, *, + _walk=os.walk, + ): + # A wrapper around os.walk that resolves the filenames. + for parent, _, names in _walk(root): + for name in names: + yield os.path.join(parent, name) + + +def walk_tree(root, *, + suffix=None, + walk=_walk_tree, + ): + """Yield each file in the tree under the given directory name. + + If "suffix" is provided then only files with that suffix will + be included. + """ + if suffix and not isinstance(suffix, str): + raise ValueError('suffix must be a string') + + for filename in walk(root): + if suffix and not filename.endswith(suffix): + continue + yield filename + + +def glob_tree(root, *, + suffix=None, + _glob=glob.iglob, + ): + """Yield each file in the tree under the given directory name. + + If "suffix" is provided then only files with that suffix will + be included. + """ + suffix = suffix or '' + if not isinstance(suffix, str): + raise ValueError('suffix must be a string') + + for filename in _glob(f'{root}/*{suffix}'): + yield filename + for filename in _glob(f'{root}/**/*{suffix}'): + yield filename + + +def iter_files(root, suffix=None, relparent=None, *, + get_files=os.walk, + _glob=glob_tree, + _walk=walk_tree, + ): + """Yield each file in the tree under the given directory name. + + If "root" is a non-string iterable then do the same for each of + those trees. + + If "suffix" is provided then only files with that suffix will + be included. + + if "relparent" is provided then it is used to resolve each + filename as a relative path. + """ + if not isinstance(root, str): + roots = root + for root in roots: + yield from iter_files(root, suffix, relparent, + get_files=get_files, + _glob=_glob, _walk=_walk) + return + + # Use the right "walk" function. + if get_files in (glob.glob, glob.iglob, glob_tree): + get_files = _glob + else: + _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files + get_files = (lambda *a, **k: _walk(*a, walk=_files, **k)) + + # Handle a single suffix. + if suffix and not isinstance(suffix, str): + filenames = get_files(root) + suffix = tuple(suffix) + else: + filenames = get_files(root, suffix=suffix) + suffix = None + + for filename in filenames: + if suffix and not isinstance(suffix, str): # multiple suffixes + if not filename.endswith(suffix): + continue + if relparent: + filename = os.path.relpath(filename, relparent) + yield filename + + +def iter_files_by_suffix(root, suffixes, relparent=None, *, + walk=walk_tree, + _iter_files=iter_files, + ): + """Yield each file in the tree that has the given suffixes. + + Unlike iter_files(), the results are in the original suffix order. + """ + if isinstance(suffixes, str): + suffixes = [suffixes] + # XXX Ignore repeated suffixes? + for suffix in suffixes: + yield from _iter_files(root, suffix, relparent) + + +################################## +# file info + +# XXX posix-only? + +S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH +S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH +S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH + + +def is_readable(file, *, user=None, check=False): + filename, st, mode = _get_file_info(file) + if check: + try: + okay = _check_file(filename, S_IRANY) + except NotImplementedError: + okay = NotImplemented + if okay is not NotImplemented: + return okay + # Fall back to checking the mode. + return _check_mode(st, mode, S_IRANY, user) + + +def is_writable(file, *, user=None, check=False): + filename, st, mode = _get_file_info(file) + if check: + try: + okay = _check_file(filename, S_IWANY) + except NotImplementedError: + okay = NotImplemented + if okay is not NotImplemented: + return okay + # Fall back to checking the mode. + return _check_mode(st, mode, S_IWANY, user) + + +def is_executable(file, *, user=None, check=False): + filename, st, mode = _get_file_info(file) + if check: + try: + okay = _check_file(filename, S_IXANY) + except NotImplementedError: + okay = NotImplemented + if okay is not NotImplemented: + return okay + # Fall back to checking the mode. + return _check_mode(st, mode, S_IXANY, user) + + +def _get_file_info(file): + filename = st = mode = None + if isinstance(file, int): + mode = file + elif isinstance(file, os.stat_result): + st = file + else: + if isinstance(file, str): + filename = file + elif hasattr(file, 'name') and os.path.exists(file.name): + filename = file.name + else: + raise NotImplementedError(file) + st = os.stat(filename) + return filename, st, mode or st.st_mode + + +def _check_file(filename, check): + if not isinstance(filename, str): + raise Exception(f'filename required to check file, got {filename}') + if check & S_IRANY: + flags = os.O_RDONLY + elif check & S_IWANY: + flags = os.O_WRONLY + elif check & S_IXANY: + # We can worry about S_IXANY later + return NotImplemented + else: + raise NotImplementedError(check) + + try: + fd = os.open(filename, flags) + except PermissionError: + return False + # We do not ignore other exceptions. + else: + os.close(fd) + return True + + +def _get_user_info(user): + import pwd + username = uid = gid = groups = None + if user is None: + uid = os.geteuid() + #username = os.getlogin() + username = pwd.getpwuid(uid)[0] + gid = os.getgid() + groups = os.getgroups() + else: + if isinstance(user, int): + uid = user + entry = pwd.getpwuid(uid) + username = entry.pw_name + elif isinstance(user, str): + username = user + entry = pwd.getpwnam(username) + uid = entry.pw_uid + else: + raise NotImplementedError(user) + gid = entry.pw_gid + os.getgrouplist(username, gid) + return username, uid, gid, groups + + +def _check_mode(st, mode, check, user): + orig = check + _, uid, gid, groups = _get_user_info(user) + if check & S_IRANY: + check -= S_IRANY + matched = False + if mode & stat.S_IRUSR: + if st.st_uid == uid: + matched = True + if mode & stat.S_IRGRP: + if st.st_uid == gid or st.st_uid in groups: + matched = True + if mode & stat.S_IROTH: + matched = True + if not matched: + return False + if check & S_IWANY: + check -= S_IWANY + matched = False + if mode & stat.S_IWUSR: + if st.st_uid == uid: + matched = True + if mode & stat.S_IWGRP: + if st.st_uid == gid or st.st_uid in groups: + matched = True + if mode & stat.S_IWOTH: + matched = True + if not matched: + return False + if check & S_IXANY: + check -= S_IXANY + matched = False + if mode & stat.S_IXUSR: + if st.st_uid == uid: + matched = True + if mode & stat.S_IXGRP: + if st.st_uid == gid or st.st_uid in groups: + matched = True + if mode & stat.S_IXOTH: + matched = True + if not matched: + return False + if check: + raise NotImplementedError((orig, check)) + return True diff --git a/Tools/c-analyzer/c_common/info.py b/Tools/c-analyzer/c_common/info.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Tools/c-analyzer/c_common/info.py diff --git a/Tools/c-analyzer/c_common/iterutil.py b/Tools/c-analyzer/c_common/iterutil.py new file mode 100644 index 0000000..6ded105 --- /dev/null +++ b/Tools/c-analyzer/c_common/iterutil.py @@ -0,0 +1,48 @@ + +_NOT_SET = object() + + +def peek_and_iter(items): + if not items: + return None, None + items = iter(items) + try: + peeked = next(items) + except StopIteration: + return None, None + def chain(): + yield peeked + yield from items + return chain(), peeked + + +def iter_many(items, onempty=None): + if not items: + if onempty is None: + return + if not callable(onempty): + raise onEmpty + items = onempty(items) + yield from iter_many(items, onempty=None) + return + items = iter(items) + try: + first = next(items) + except StopIteration: + if onempty is None: + return + if not callable(onempty): + raise onEmpty + items = onempty(items) + yield from iter_many(items, onempty=None) + else: + try: + second = next(items) + except StopIteration: + yield first, False + return + else: + yield first, True + yield second, True + for item in items: + yield item, True diff --git a/Tools/c-analyzer/c_common/logging.py b/Tools/c-analyzer/c_common/logging.py new file mode 100644 index 0000000..12398f7 --- /dev/null +++ b/Tools/c-analyzer/c_common/logging.py @@ -0,0 +1,63 @@ +import logging +import sys + + +VERBOSITY = 3 + + +# The root logger for the whole top-level package: +_logger = logging.getLogger(__name__.rpartition('.')[0]) + + +def configure_logger(logger, verbosity=VERBOSITY, *, + logfile=None, + maxlevel=logging.CRITICAL, + ): + level = max(1, # 0 disables it, so we use the next lowest. + min(maxlevel, + maxlevel - verbosity * 10)) + logger.setLevel(level) + #logger.propagate = False + + if not logger.handlers: + if logfile: + handler = logging.FileHandler(logfile) + else: + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(level) + #handler.setFormatter(logging.Formatter()) + logger.addHandler(handler) + + # In case the provided logger is in a sub-package... + if logger is not _logger: + configure_logger( + _logger, + verbosity, + logfile=logfile, + maxlevel=maxlevel, + ) + + +def hide_emit_errors(): + """Ignore errors while emitting log entries. + + Rather than printing a message desribing the error, we show nothing. + """ + # For now we simply ignore all exceptions. If we wanted to ignore + # specific ones (e.g. BrokenPipeError) then we would need to use + # a Handler subclass with a custom handleError() method. + orig = logging.raiseExceptions + logging.raiseExceptions = False + def restore(): + logging.raiseExceptions = orig + return restore + + +class Printer: + def __init__(self, verbosity=VERBOSITY): + self.verbosity = verbosity + + def info(self, *args, **kwargs): + if self.verbosity < 3: + return + print(*args, **kwargs) diff --git a/Tools/c-analyzer/c_common/misc.py b/Tools/c-analyzer/c_common/misc.py new file mode 100644 index 0000000..bfd503a --- /dev/null +++ b/Tools/c-analyzer/c_common/misc.py @@ -0,0 +1,7 @@ + +class Labeled: + __slots__ = ('_label',) + def __init__(self, label): + self._label = label + def __repr__(self): + return f'<{self._label}>' diff --git a/Tools/c-analyzer/c_common/scriptutil.py b/Tools/c-analyzer/c_common/scriptutil.py new file mode 100644 index 0000000..939a850 --- /dev/null +++ b/Tools/c-analyzer/c_common/scriptutil.py @@ -0,0 +1,577 @@ +import argparse +import contextlib +import fnmatch +import logging +import os +import os.path +import shutil +import sys + +from . import fsutil, strutil, iterutil, logging as loggingutil + + +def get_prog(spec=None, *, absolute=False, allowsuffix=True): + if spec is None: + _, spec = _find_script() + # This is more natural for prog than __file__ would be. + filename = sys.argv[0] + elif isinstance(spec, str): + filename = os.path.normpath(spec) + spec = None + else: + filename = spec.origin + if _is_standalone(filename): + # Check if "installed". + if allowsuffix or not filename.endswith('.py'): + basename = os.path.basename(filename) + found = shutil.which(basename) + if found: + script = os.path.abspath(filename) + found = os.path.abspath(found) + if os.path.normcase(script) == os.path.normcase(found): + return basename + # It is only "standalone". + if absolute: + filename = os.path.abspath(filename) + return filename + elif spec is not None: + module = spec.name + if module.endswith('.__main__'): + module = module[:-9] + return f'{sys.executable} -m {module}' + else: + if absolute: + filename = os.path.abspath(filename) + return f'{sys.executable} {filename}' + + +def _find_script(): + frame = sys._getframe(2) + while frame.f_globals['__name__'] != '__main__': + frame = frame.f_back + + # This should match sys.argv[0]. + filename = frame.f_globals['__file__'] + # This will be None if -m wasn't used.. + spec = frame.f_globals['__spec__'] + return filename, spec + + +def is_installed(filename, *, allowsuffix=True): + if not allowsuffix and filename.endswith('.py'): + return False + filename = os.path.abspath(os.path.normalize(filename)) + found = shutil.which(os.path.basename(filename)) + if not found: + return False + if found != filename: + return False + return _is_standalone(filename) + + +def is_standalone(filename): + filename = os.path.abspath(os.path.normalize(filename)) + return _is_standalone(filename) + + +def _is_standalone(filename): + return fsutil.is_executable(filename) + + +################################## +# logging + +VERBOSITY = 3 + +TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip() +TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO')) + + +logger = logging.getLogger(__name__) + + +def configure_logger(verbosity, logger=None, **kwargs): + if logger is None: + # Configure the root logger. + logger = logging.getLogger() + loggingutil.configure_logger(logger, verbosity, **kwargs) + + +################################## +# selections + +class UnsupportedSelectionError(Exception): + def __init__(self, values, possible): + self.values = tuple(values) + self.possible = tuple(possible) + super().__init__(f'unsupported selections {self.unique}') + + @property + def unique(self): + return tuple(sorted(set(self.values))) + + +def normalize_selection(selected: str, *, possible=None): + if selected in (None, True, False): + return selected + elif isinstance(selected, str): + selected = [selected] + elif not selected: + return () + + unsupported = [] + _selected = set() + for item in selected: + if not item: + continue + for value in item.strip().replace(',', ' ').split(): + if not value: + continue + # XXX Handle subtraction (leading "-"). + if possible and value not in possible and value != 'all': + unsupported.append(value) + _selected.add(value) + if unsupported: + raise UnsupportedSelectionError(unsupported, tuple(possible)) + if 'all' in _selected: + return True + return frozenset(selected) + + +################################## +# CLI parsing helpers + +class CLIArgSpec(tuple): + def __new__(cls, *args, **kwargs): + return super().__new__(cls, (args, kwargs)) + + def __repr__(self): + args, kwargs = self + args = [repr(arg) for arg in args] + for name, value in kwargs.items(): + args.append(f'{name}={value!r}') + return f'{type(self).__name__}({", ".join(args)})' + + def __call__(self, parser, *, _noop=(lambda a: None)): + self.apply(parser) + return _noop + + def apply(self, parser): + args, kwargs = self + parser.add_argument(*args, **kwargs) + + +def apply_cli_argspecs(parser, specs): + processors = [] + for spec in specs: + if callable(spec): + procs = spec(parser) + _add_procs(processors, procs) + else: + args, kwargs = spec + parser.add_argument(args, kwargs) + return processors + + +def _add_procs(flattened, procs): + # XXX Fail on non-empty, non-callable procs? + if not procs: + return + if callable(procs): + flattened.append(procs) + else: + #processors.extend(p for p in procs if callable(p)) + for proc in procs: + _add_procs(flattened, proc) + + +def add_verbosity_cli(parser): + parser.add_argument('-q', '--quiet', action='count', default=0) + parser.add_argument('-v', '--verbose', action='count', default=0) + + def process_args(args): + ns = vars(args) + key = 'verbosity' + if key in ns: + parser.error(f'duplicate arg {key!r}') + ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet')) + return key + return process_args + + +def add_traceback_cli(parser): + parser.add_argument('--traceback', '--tb', action='store_true', + default=TRACEBACK) + parser.add_argument('--no-traceback', '--no-tb', dest='traceback', + action='store_const', const=False) + + def process_args(args): + ns = vars(args) + key = 'traceback_cm' + if key in ns: + parser.error(f'duplicate arg {key!r}') + showtb = ns.pop('traceback') + + @contextlib.contextmanager + def traceback_cm(): + restore = loggingutil.hide_emit_errors() + try: + yield + except BrokenPipeError: + # It was piped to "head" or something similar. + pass + except NotImplementedError: + raise # re-raise + except Exception as exc: + if not showtb: + sys.exit(f'ERROR: {exc}') + raise # re-raise + except KeyboardInterrupt: + if not showtb: + sys.exit('\nINTERRUPTED') + raise # re-raise + except BaseException as exc: + if not showtb: + sys.exit(f'{type(exc).__name__}: {exc}') + raise # re-raise + finally: + restore() + ns[key] = traceback_cm() + return key + return process_args + + +def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs): +# if opt is True: +# parser.add_argument(f'--{dest}', action='append', **kwargs) +# elif isinstance(opt, str) and opt.startswith('-'): +# parser.add_argument(opt, dest=dest, action='append', **kwargs) +# else: +# arg = dest if not opt else opt +# kwargs.setdefault('nargs', '+') +# parser.add_argument(arg, dest=dest, action='append', **kwargs) + if not isinstance(opt, str): + parser.error(f'opt must be a string, got {opt!r}') + elif opt.startswith('-'): + parser.add_argument(opt, dest=dest, action='append', **kwargs) + else: + kwargs.setdefault('nargs', '+') + #kwargs.setdefault('metavar', opt.upper()) + parser.add_argument(opt, dest=dest, action='append', **kwargs) + + def process_args(args): + ns = vars(args) + + # XXX Use normalize_selection()? + if isinstance(ns[dest], str): + ns[dest] = [ns[dest]] + selections = [] + for many in ns[dest] or (): + for value in many.split(sep): + if value not in choices: + parser.error(f'unknown {dest} {value!r}') + selections.append(value) + ns[dest] = selections + return process_args + + +def add_files_cli(parser, *, excluded=None, nargs=None): + process_files = add_file_filtering_cli(parser, excluded=excluded) + parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME') + return [ + process_files, + ] + + +def add_file_filtering_cli(parser, *, excluded=None): + parser.add_argument('--start') + parser.add_argument('--include', action='append') + parser.add_argument('--exclude', action='append') + + excluded = tuple(excluded or ()) + + def process_args(args): + ns = vars(args) + key = 'iter_filenames' + if key in ns: + parser.error(f'duplicate arg {key!r}') + + _include = tuple(ns.pop('include') or ()) + _exclude = excluded + tuple(ns.pop('exclude') or ()) + kwargs = dict( + start=ns.pop('start'), + include=tuple(_parse_files(_include)), + exclude=tuple(_parse_files(_exclude)), + # We use the default for "show_header" + ) + ns[key] = (lambda files: fsutil.iter_filenames(files, **kwargs)) + return process_args + + +def _parse_files(filenames): + for filename, _ in strutil.parse_entries(filenames): + yield filename.strip() + + +def add_failure_filtering_cli(parser, pool, *, default=False): + parser.add_argument('--fail', action='append', + metavar=f'"{{all|{"|".join(sorted(pool))}}},..."') + parser.add_argument('--no-fail', dest='fail', action='store_const', const=()) + + def process_args(args): + ns = vars(args) + + fail = ns.pop('fail') + try: + fail = normalize_selection(fail, possible=pool) + except UnsupportedSelectionError as exc: + parser.error(f'invalid --fail values: {", ".join(exc.unique)}') + else: + if fail is None: + fail = default + + if fail is True: + def ignore_exc(_exc): + return False + elif fail is False: + def ignore_exc(_exc): + return True + else: + def ignore_exc(exc): + for err in fail: + if type(exc) == pool[err]: + return False + else: + return True + args.ignore_exc = ignore_exc + return process_args + + +def add_kind_filtering_cli(parser, *, default=None): + parser.add_argument('--kinds', action='append') + + def process_args(args): + ns = vars(args) + + kinds = [] + for kind in ns.pop('kinds') or default or (): + kinds.extend(kind.strip().replace(',', ' ').split()) + + if not kinds: + match_kind = (lambda k: True) + else: + included = set() + excluded = set() + for kind in kinds: + if kind.startswith('-'): + kind = kind[1:] + excluded.add(kind) + if kind in included: + included.remove(kind) + else: + included.add(kind) + if kind in excluded: + excluded.remove(kind) + if excluded: + if included: + ... # XXX fail? + def match_kind(kind, *, _excluded=excluded): + return kind not in _excluded + else: + def match_kind(kind, *, _included=included): + return kind in _included + args.match_kind = match_kind + return process_args + + +COMMON_CLI = [ + add_verbosity_cli, + add_traceback_cli, + #add_dryrun_cli, +] + + +def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None): + arg_processors = {} + if isinstance(subset, str): + cmdname = subset + try: + _, argspecs, _ = commands[cmdname] + except KeyError: + raise ValueError(f'unsupported subset {subset!r}') + parser.set_defaults(cmd=cmdname) + arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs) + else: + if subset is None: + cmdnames = subset = list(commands) + elif not subset: + raise NotImplementedError + elif isinstance(subset, set): + cmdnames = [k for k in commands if k in subset] + subset = sorted(subset) + else: + cmdnames = [n for n in subset if n in commands] + if len(cmdnames) < len(subset): + bad = tuple(n for n in subset if n not in commands) + raise ValueError(f'unsupported subset {bad}') + + common = argparse.ArgumentParser(add_help=False) + common_processors = apply_cli_argspecs(common, commonspecs) + subs = parser.add_subparsers(dest='cmd') + for cmdname in cmdnames: + description, argspecs, _ = commands[cmdname] + sub = subs.add_parser( + cmdname, + description=description, + parents=[common], + ) + cmd_processors = _add_cmd_cli(sub, (), argspecs) + arg_processors[cmdname] = common_processors + cmd_processors + return arg_processors + + +def _add_cmd_cli(parser, commonspecs, argspecs): + processors = [] + argspecs = list(commonspecs or ()) + list(argspecs or ()) + for argspec in argspecs: + if callable(argspec): + procs = argspec(parser) + _add_procs(processors, procs) + else: + if not argspec: + raise NotImplementedError + args = list(argspec) + if not isinstance(args[-1], str): + kwargs = args.pop() + if not isinstance(args[0], str): + try: + args, = args + except (TypeError, ValueError): + parser.error(f'invalid cmd args {argspec!r}') + else: + kwargs = {} + parser.add_argument(*args, **kwargs) + # There will be nothing to process. + return processors + + +def _flatten_processors(processors): + for proc in processors: + if proc is None: + continue + if callable(proc): + yield proc + else: + yield from _flatten_processors(proc) + + +def process_args(args, processors, *, keys=None): + processors = _flatten_processors(processors) + ns = vars(args) + extracted = {} + if keys is None: + for process_args in processors: + for key in process_args(args): + extracted[key] = ns.pop(key) + else: + remainder = set(keys) + for process_args in processors: + hanging = process_args(args) + if isinstance(hanging, str): + hanging = [hanging] + for key in hanging or (): + if key not in remainder: + raise NotImplementedError(key) + extracted[key] = ns.pop(key) + remainder.remove(key) + if remainder: + raise NotImplementedError(sorted(remainder)) + return extracted + + +def process_args_by_key(args, processors, keys): + extracted = process_args(args, processors, keys=keys) + return [extracted[key] for key in keys] + + +################################## +# commands + +def set_command(name, add_cli): + """A decorator factory to set CLI info.""" + def decorator(func): + if hasattr(func, '__cli__'): + raise Exception(f'already set') + func.__cli__ = (name, add_cli) + return func + return decorator + + +################################## +# main() helpers + +def filter_filenames(filenames, iter_filenames=None): + for filename, check, _ in _iter_filenames(filenames, iter_filenames): + if (reason := check()): + logger.debug(f'{filename}: {reason}') + continue + yield filename + + +def main_for_filenames(filenames, iter_filenames=None): + for filename, check, show in _iter_filenames(filenames, iter_filenames): + if show: + print() + print('-------------------------------------------') + print(filename) + if (reason := check()): + print(reason) + continue + yield filename + + +def _iter_filenames(filenames, iter_files): + if iter_files is None: + iter_files = fsutil.iter_filenames + yield from iter_files(filenames) + return + + onempty = Exception('no filenames provided') + items = iter_files(filenames) + items, peeked = iterutil.peek_and_iter(items) + if not items: + raise onempty + if isinstance(peeked, str): + check = (lambda: True) + for filename, ismany in iterutil.iter_many(items, onempty): + yield filename, check, ismany + elif len(peeked) == 3: + yield from items + else: + raise NotImplementedError + + +def iter_marks(mark='.', *, group=5, groups=2, lines=10, sep=' '): + mark = mark or '' + sep = f'{mark}{sep}' if sep else mark + end = f'{mark}{os.linesep}' + div = os.linesep + perline = group * groups + perlines = perline * lines + + if perline == 1: + yield end + elif group == 1: + yield sep + + count = 1 + while True: + if count % perline == 0: + yield end + if count % perlines == 0: + yield div + elif count % group == 0: + yield sep + else: + yield mark + count += 1 diff --git a/Tools/c-analyzer/c_common/show.py b/Tools/c-analyzer/c_common/show.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Tools/c-analyzer/c_common/show.py diff --git a/Tools/c-analyzer/c_common/strutil.py b/Tools/c-analyzer/c_common/strutil.py new file mode 100644 index 0000000..e7535d4 --- /dev/null +++ b/Tools/c-analyzer/c_common/strutil.py @@ -0,0 +1,42 @@ +import logging + + +logger = logging.getLogger(__name__) + + +def unrepr(value): + raise NotImplementedError + + +def parse_entries(entries, *, ignoresep=None): + for entry in entries: + if ignoresep and ignoresep in entry: + subentries = [entry] + else: + subentries = entry.strip().replace(',', ' ').split() + for item in subentries: + if item.startswith('+'): + filename = item[1:] + try: + infile = open(filename) + except FileNotFoundError: + logger.debug(f'ignored in parse_entries(): +{filename}') + return + with infile: + # We read the entire file here to ensure the file + # gets closed sooner rather than later. Note that + # the file would stay open if this iterator is never + # exchausted. + lines = infile.read().splitlines() + for line in _iter_significant_lines(lines): + yield line, filename + else: + yield item, None + + +def _iter_significant_lines(lines): + for line in lines: + line = line.partition('#')[0] + if not line.strip(): + continue + yield line diff --git a/Tools/c-analyzer/c_common/tables.py b/Tools/c-analyzer/c_common/tables.py new file mode 100644 index 0000000..70a230a --- /dev/null +++ b/Tools/c-analyzer/c_common/tables.py @@ -0,0 +1,213 @@ +import csv + +from . import NOT_SET, strutil, fsutil + + +EMPTY = '-' +UNKNOWN = '???' + + +def parse_markers(markers, default=None): + if markers is NOT_SET: + return default + if not markers: + return None + if type(markers) is not str: + return markers + if markers == markers[0] * len(markers): + return [markers] + return list(markers) + + +def fix_row(row, **markers): + if isinstance(row, str): + raise NotImplementedError(row) + empty = parse_markers(markers.pop('empty', ('-',))) + unknown = parse_markers(markers.pop('unknown', ('???',))) + row = (val if val else None for val in row) + if not empty: + if not unknown: + return row + return (UNKNOWN if val in unknown else val for val in row) + elif not unknown: + return (EMPTY if val in empty else val for val in row) + return (EMPTY if val in empty else (UNKNOWN if val in unknown else val) + for val in row) + + +def _fix_read_default(row): + for value in row: + yield value.strip() + + +def _fix_write_default(row, empty=''): + for value in row: + yield empty if value is None else str(value) + + +def _normalize_fix_read(fix): + if fix is None: + fix = '' + if callable(fix): + def fix_row(row): + values = fix(row) + return _fix_read_default(values) + elif isinstance(fix, str): + def fix_row(row): + values = _fix_read_default(row) + return (None if v == fix else v + for v in values) + else: + raise NotImplementedError(fix) + return fix_row + + +def _normalize_fix_write(fix, empty=''): + if fix is None: + fix = empty + if callable(fix): + def fix_row(row): + values = fix(row) + return _fix_write_default(values, empty) + elif isinstance(fix, str): + def fix_row(row): + return _fix_write_default(row, fix) + else: + raise NotImplementedError(fix) + return fix_row + + +def read_table(infile, header, *, + sep='\t', + fix=None, + _open=open, + _get_reader=csv.reader, + ): + """Yield each row of the given ???-separated (e.g. tab) file.""" + if isinstance(infile, str): + with _open(infile, newline='') as infile: + yield from read_table( + infile, + header, + sep=sep, + fix=fix, + _open=_open, + _get_reader=_get_reader, + ) + return + lines = strutil._iter_significant_lines(infile) + + # Validate the header. + if not isinstance(header, str): + header = sep.join(header) + try: + actualheader = next(lines).strip() + except StopIteration: + actualheader = '' + if actualheader != header: + raise ValueError(f'bad header {actualheader!r}') + + fix_row = _normalize_fix_read(fix) + for row in _get_reader(lines, delimiter=sep or '\t'): + yield tuple(fix_row(row)) + + +def write_table(outfile, header, rows, *, + sep='\t', + fix=None, + backup=True, + _open=open, + _get_writer=csv.writer, + ): + """Write each of the rows to the given ???-separated (e.g. tab) file.""" + if backup: + fsutil.create_backup(outfile, backup) + if isinstance(outfile, str): + with _open(outfile, 'w', newline='') as outfile: + return write_table( + outfile, + header, + rows, + sep=sep, + fix=fix, + backup=backup, + _open=_open, + _get_writer=_get_writer, + ) + + if isinstance(header, str): + header = header.split(sep or '\t') + fix_row = _normalize_fix_write(fix) + writer = _get_writer(outfile, delimiter=sep or '\t') + writer.writerow(header) + for row in rows: + writer.writerow( + tuple(fix_row(row)) + ) + + +def parse_table(entries, sep, header=None, rawsep=None, *, + default=NOT_SET, + strict=True, + ): + header, sep = _normalize_table_file_props(header, sep) + if not sep: + raise ValueError('missing "sep"') + + ncols = None + if header: + if strict: + ncols = len(header.split(sep)) + cur_file = None + for line, filename in strutil.parse_entries(entries, ignoresep=sep): + _sep = sep + if filename: + if header and cur_file != filename: + cur_file = filename + # Skip the first line if it's the header. + if line.strip() == header: + continue + else: + # We expected the header. + raise NotImplementedError((header, line)) + elif rawsep and sep not in line: + _sep = rawsep + + row = _parse_row(line, _sep, ncols, default) + if strict and not ncols: + ncols = len(row) + yield row, filename + + +def parse_row(line, sep, *, ncols=None, default=NOT_SET): + if not sep: + raise ValueError('missing "sep"') + return _parse_row(line, sep, ncols, default) + + +def _parse_row(line, sep, ncols, default): + row = tuple(v.strip() for v in line.split(sep)) + if (ncols or 0) > 0: + diff = ncols - len(row) + if diff: + if default is NOT_SET or diff < 0: + raise Exception(f'bad row (expected {ncols} columns, got {row!r})') + row += (default,) * diff + return row + + +def _normalize_table_file_props(header, sep): + if not header: + return None, sep + + if not isinstance(header, str): + if not sep: + raise NotImplementedError(header) + header = sep.join(header) + elif not sep: + for sep in ('\t', ',', ' '): + if sep in header: + break + else: + sep = None + return header, sep |