summaryrefslogtreecommitdiffstats
path: root/Tools/c-analyzer/c_common
diff options
context:
space:
mode:
Diffstat (limited to 'Tools/c-analyzer/c_common')
-rw-r--r--Tools/c-analyzer/c_common/__init__.py2
-rw-r--r--Tools/c-analyzer/c_common/clsutil.py117
-rw-r--r--Tools/c-analyzer/c_common/fsutil.py388
-rw-r--r--Tools/c-analyzer/c_common/info.py0
-rw-r--r--Tools/c-analyzer/c_common/iterutil.py48
-rw-r--r--Tools/c-analyzer/c_common/logging.py63
-rw-r--r--Tools/c-analyzer/c_common/misc.py7
-rw-r--r--Tools/c-analyzer/c_common/scriptutil.py577
-rw-r--r--Tools/c-analyzer/c_common/show.py0
-rw-r--r--Tools/c-analyzer/c_common/strutil.py42
-rw-r--r--Tools/c-analyzer/c_common/tables.py213
11 files changed, 1457 insertions, 0 deletions
diff --git a/Tools/c-analyzer/c_common/__init__.py b/Tools/c-analyzer/c_common/__init__.py
new file mode 100644
index 0000000..a4c3bb2
--- /dev/null
+++ b/Tools/c-analyzer/c_common/__init__.py
@@ -0,0 +1,2 @@
+
+NOT_SET = object()
diff --git a/Tools/c-analyzer/c_common/clsutil.py b/Tools/c-analyzer/c_common/clsutil.py
new file mode 100644
index 0000000..aa5f6b9
--- /dev/null
+++ b/Tools/c-analyzer/c_common/clsutil.py
@@ -0,0 +1,117 @@
+
+_NOT_SET = object()
+
+
+class Slot:
+ """A descriptor that provides a slot.
+
+ This is useful for types that can't have slots via __slots__,
+ e.g. tuple subclasses.
+ """
+
+ __slots__ = ('initial', 'default', 'readonly', 'instances', 'name')
+
+ def __init__(self, initial=_NOT_SET, *,
+ default=_NOT_SET,
+ readonly=False,
+ ):
+ self.initial = initial
+ self.default = default
+ self.readonly = readonly
+
+ # The instance cache is not inherently tied to the normal
+ # lifetime of the instances. So must do something in order to
+ # avoid keeping the instances alive by holding a reference here.
+ # Ideally we would use weakref.WeakValueDictionary to do this.
+ # However, most builtin types do not support weakrefs. So
+ # instead we monkey-patch __del__ on the attached class to clear
+ # the instance.
+ self.instances = {}
+ self.name = None
+
+ def __set_name__(self, cls, name):
+ if self.name is not None:
+ raise TypeError('already used')
+ self.name = name
+ try:
+ slotnames = cls.__slot_names__
+ except AttributeError:
+ slotnames = cls.__slot_names__ = []
+ slotnames.append(name)
+ self._ensure___del__(cls, slotnames)
+
+ def __get__(self, obj, cls):
+ if obj is None: # called on the class
+ return self
+ try:
+ value = self.instances[id(obj)]
+ except KeyError:
+ if self.initial is _NOT_SET:
+ value = self.default
+ else:
+ value = self.initial
+ self.instances[id(obj)] = value
+ if value is _NOT_SET:
+ raise AttributeError(self.name)
+ # XXX Optionally make a copy?
+ return value
+
+ def __set__(self, obj, value):
+ if self.readonly:
+ raise AttributeError(f'{self.name} is readonly')
+ # XXX Optionally coerce?
+ self.instances[id(obj)] = value
+
+ def __delete__(self, obj):
+ if self.readonly:
+ raise AttributeError(f'{self.name} is readonly')
+ self.instances[id(obj)] = self.default # XXX refleak?
+
+ def _ensure___del__(self, cls, slotnames): # See the comment in __init__().
+ try:
+ old___del__ = cls.__del__
+ except AttributeError:
+ old___del__ = (lambda s: None)
+ else:
+ if getattr(old___del__, '_slotted', False):
+ return
+
+ def __del__(_self):
+ for name in slotnames:
+ delattr(_self, name)
+ old___del__(_self)
+ __del__._slotted = True
+ cls.__del__ = __del__
+
+ def set(self, obj, value):
+ """Update the cached value for an object.
+
+ This works even if the descriptor is read-only. This is
+ particularly useful when initializing the object (e.g. in
+ its __new__ or __init__).
+ """
+ self.instances[id(obj)] = value
+
+
+class classonly:
+ """A non-data descriptor that makes a value only visible on the class.
+
+ This is like the "classmethod" builtin, but does not show up on
+ instances of the class. It may be used as a decorator.
+ """
+
+ def __init__(self, value):
+ self.value = value
+ self.getter = classmethod(value).__get__
+ self.name = None
+
+ def __set_name__(self, cls, name):
+ if self.name is not None:
+ raise TypeError('already used')
+ self.name = name
+
+ def __get__(self, obj, cls):
+ if obj is not None:
+ raise AttributeError(self.name)
+ # called on the class
+ return self.getter(None, cls)
diff --git a/Tools/c-analyzer/c_common/fsutil.py b/Tools/c-analyzer/c_common/fsutil.py
new file mode 100644
index 0000000..56023f3
--- /dev/null
+++ b/Tools/c-analyzer/c_common/fsutil.py
@@ -0,0 +1,388 @@
+import fnmatch
+import glob
+import os
+import os.path
+import shutil
+import stat
+
+from .iterutil import iter_many
+
+
+C_SOURCE_SUFFIXES = ('.c', '.h')
+
+
+def create_backup(old, backup=None):
+ if isinstance(old, str):
+ filename = old
+ else:
+ filename = getattr(old, 'name', None)
+ if not filename:
+ return None
+ if not backup or backup is True:
+ backup = f'{filename}.bak'
+ try:
+ shutil.copyfile(filename, backup)
+ except FileNotFoundError as exc:
+ if exc.filename != filename:
+ raise # re-raise
+ backup = None
+ return backup
+
+
+##################################
+# find files
+
+def match_glob(filename, pattern):
+ if fnmatch.fnmatch(filename, pattern):
+ return True
+
+ # fnmatch doesn't handle ** quite right. It will not match the
+ # following:
+ #
+ # ('x/spam.py', 'x/**/*.py')
+ # ('spam.py', '**/*.py')
+ #
+ # though it *will* match the following:
+ #
+ # ('x/y/spam.py', 'x/**/*.py')
+ # ('x/spam.py', '**/*.py')
+
+ if '**/' not in pattern:
+ return False
+
+ # We only accommodate the single-"**" case.
+ return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))
+
+
+def iter_filenames(filenames, *,
+ start=None,
+ include=None,
+ exclude=None,
+ ):
+ onempty = Exception('no filenames provided')
+ for filename, solo in iter_many(filenames, onempty):
+ check, start = _get_check(filename, start, include, exclude)
+ yield filename, check, solo
+# filenames = iter(filenames or ())
+# try:
+# first = next(filenames)
+# except StopIteration:
+# raise Exception('no filenames provided')
+# try:
+# second = next(filenames)
+# except StopIteration:
+# check, _ = _get_check(first, start, include, exclude)
+# yield first, check, False
+# return
+#
+# check, start = _get_check(first, start, include, exclude)
+# yield first, check, True
+# check, start = _get_check(second, start, include, exclude)
+# yield second, check, True
+# for filename in filenames:
+# check, start = _get_check(filename, start, include, exclude)
+# yield filename, check, True
+
+
+def expand_filenames(filenames):
+ for filename in filenames:
+ # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
+ if '**/' in filename:
+ yield from glob.glob(filename.replace('**/', ''))
+ yield from glob.glob(filename)
+
+
+def _get_check(filename, start, include, exclude):
+ if start and filename != start:
+ return (lambda: '<skipped>'), start
+ else:
+ def check():
+ if _is_excluded(filename, exclude, include):
+ return '<excluded>'
+ return None
+ return check, None
+
+
+def _is_excluded(filename, exclude, include):
+ if include:
+ for included in include:
+ if match_glob(filename, included):
+ return False
+ return True
+ elif exclude:
+ for excluded in exclude:
+ if match_glob(filename, excluded):
+ return True
+ return False
+ else:
+ return False
+
+
+def _walk_tree(root, *,
+ _walk=os.walk,
+ ):
+ # A wrapper around os.walk that resolves the filenames.
+ for parent, _, names in _walk(root):
+ for name in names:
+ yield os.path.join(parent, name)
+
+
+def walk_tree(root, *,
+ suffix=None,
+ walk=_walk_tree,
+ ):
+ """Yield each file in the tree under the given directory name.
+
+ If "suffix" is provided then only files with that suffix will
+ be included.
+ """
+ if suffix and not isinstance(suffix, str):
+ raise ValueError('suffix must be a string')
+
+ for filename in walk(root):
+ if suffix and not filename.endswith(suffix):
+ continue
+ yield filename
+
+
+def glob_tree(root, *,
+ suffix=None,
+ _glob=glob.iglob,
+ ):
+ """Yield each file in the tree under the given directory name.
+
+ If "suffix" is provided then only files with that suffix will
+ be included.
+ """
+ suffix = suffix or ''
+ if not isinstance(suffix, str):
+ raise ValueError('suffix must be a string')
+
+ for filename in _glob(f'{root}/*{suffix}'):
+ yield filename
+ for filename in _glob(f'{root}/**/*{suffix}'):
+ yield filename
+
+
+def iter_files(root, suffix=None, relparent=None, *,
+ get_files=os.walk,
+ _glob=glob_tree,
+ _walk=walk_tree,
+ ):
+ """Yield each file in the tree under the given directory name.
+
+ If "root" is a non-string iterable then do the same for each of
+ those trees.
+
+ If "suffix" is provided then only files with that suffix will
+ be included.
+
+ if "relparent" is provided then it is used to resolve each
+ filename as a relative path.
+ """
+ if not isinstance(root, str):
+ roots = root
+ for root in roots:
+ yield from iter_files(root, suffix, relparent,
+ get_files=get_files,
+ _glob=_glob, _walk=_walk)
+ return
+
+ # Use the right "walk" function.
+ if get_files in (glob.glob, glob.iglob, glob_tree):
+ get_files = _glob
+ else:
+ _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
+ get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))
+
+ # Handle a single suffix.
+ if suffix and not isinstance(suffix, str):
+ filenames = get_files(root)
+ suffix = tuple(suffix)
+ else:
+ filenames = get_files(root, suffix=suffix)
+ suffix = None
+
+ for filename in filenames:
+ if suffix and not isinstance(suffix, str): # multiple suffixes
+ if not filename.endswith(suffix):
+ continue
+ if relparent:
+ filename = os.path.relpath(filename, relparent)
+ yield filename
+
+
+def iter_files_by_suffix(root, suffixes, relparent=None, *,
+ walk=walk_tree,
+ _iter_files=iter_files,
+ ):
+ """Yield each file in the tree that has the given suffixes.
+
+ Unlike iter_files(), the results are in the original suffix order.
+ """
+ if isinstance(suffixes, str):
+ suffixes = [suffixes]
+ # XXX Ignore repeated suffixes?
+ for suffix in suffixes:
+ yield from _iter_files(root, suffix, relparent)
+
+
+##################################
+# file info
+
+# XXX posix-only?
+
+S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
+S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
+S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
+
+
+def is_readable(file, *, user=None, check=False):
+ filename, st, mode = _get_file_info(file)
+ if check:
+ try:
+ okay = _check_file(filename, S_IRANY)
+ except NotImplementedError:
+ okay = NotImplemented
+ if okay is not NotImplemented:
+ return okay
+ # Fall back to checking the mode.
+ return _check_mode(st, mode, S_IRANY, user)
+
+
+def is_writable(file, *, user=None, check=False):
+ filename, st, mode = _get_file_info(file)
+ if check:
+ try:
+ okay = _check_file(filename, S_IWANY)
+ except NotImplementedError:
+ okay = NotImplemented
+ if okay is not NotImplemented:
+ return okay
+ # Fall back to checking the mode.
+ return _check_mode(st, mode, S_IWANY, user)
+
+
+def is_executable(file, *, user=None, check=False):
+ filename, st, mode = _get_file_info(file)
+ if check:
+ try:
+ okay = _check_file(filename, S_IXANY)
+ except NotImplementedError:
+ okay = NotImplemented
+ if okay is not NotImplemented:
+ return okay
+ # Fall back to checking the mode.
+ return _check_mode(st, mode, S_IXANY, user)
+
+
+def _get_file_info(file):
+ filename = st = mode = None
+ if isinstance(file, int):
+ mode = file
+ elif isinstance(file, os.stat_result):
+ st = file
+ else:
+ if isinstance(file, str):
+ filename = file
+ elif hasattr(file, 'name') and os.path.exists(file.name):
+ filename = file.name
+ else:
+ raise NotImplementedError(file)
+ st = os.stat(filename)
+ return filename, st, mode or st.st_mode
+
+
+def _check_file(filename, check):
+ if not isinstance(filename, str):
+ raise Exception(f'filename required to check file, got {filename}')
+ if check & S_IRANY:
+ flags = os.O_RDONLY
+ elif check & S_IWANY:
+ flags = os.O_WRONLY
+ elif check & S_IXANY:
+ # We can worry about S_IXANY later
+ return NotImplemented
+ else:
+ raise NotImplementedError(check)
+
+ try:
+ fd = os.open(filename, flags)
+ except PermissionError:
+ return False
+ # We do not ignore other exceptions.
+ else:
+ os.close(fd)
+ return True
+
+
+def _get_user_info(user):
+ import pwd
+ username = uid = gid = groups = None
+ if user is None:
+ uid = os.geteuid()
+ #username = os.getlogin()
+ username = pwd.getpwuid(uid)[0]
+ gid = os.getgid()
+ groups = os.getgroups()
+ else:
+ if isinstance(user, int):
+ uid = user
+ entry = pwd.getpwuid(uid)
+ username = entry.pw_name
+ elif isinstance(user, str):
+ username = user
+ entry = pwd.getpwnam(username)
+ uid = entry.pw_uid
+ else:
+ raise NotImplementedError(user)
+ gid = entry.pw_gid
+ os.getgrouplist(username, gid)
+ return username, uid, gid, groups
+
+
+def _check_mode(st, mode, check, user):
+ orig = check
+ _, uid, gid, groups = _get_user_info(user)
+ if check & S_IRANY:
+ check -= S_IRANY
+ matched = False
+ if mode & stat.S_IRUSR:
+ if st.st_uid == uid:
+ matched = True
+ if mode & stat.S_IRGRP:
+ if st.st_uid == gid or st.st_uid in groups:
+ matched = True
+ if mode & stat.S_IROTH:
+ matched = True
+ if not matched:
+ return False
+ if check & S_IWANY:
+ check -= S_IWANY
+ matched = False
+ if mode & stat.S_IWUSR:
+ if st.st_uid == uid:
+ matched = True
+ if mode & stat.S_IWGRP:
+ if st.st_uid == gid or st.st_uid in groups:
+ matched = True
+ if mode & stat.S_IWOTH:
+ matched = True
+ if not matched:
+ return False
+ if check & S_IXANY:
+ check -= S_IXANY
+ matched = False
+ if mode & stat.S_IXUSR:
+ if st.st_uid == uid:
+ matched = True
+ if mode & stat.S_IXGRP:
+ if st.st_uid == gid or st.st_uid in groups:
+ matched = True
+ if mode & stat.S_IXOTH:
+ matched = True
+ if not matched:
+ return False
+ if check:
+ raise NotImplementedError((orig, check))
+ return True
diff --git a/Tools/c-analyzer/c_common/info.py b/Tools/c-analyzer/c_common/info.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Tools/c-analyzer/c_common/info.py
diff --git a/Tools/c-analyzer/c_common/iterutil.py b/Tools/c-analyzer/c_common/iterutil.py
new file mode 100644
index 0000000..6ded105
--- /dev/null
+++ b/Tools/c-analyzer/c_common/iterutil.py
@@ -0,0 +1,48 @@
+
+_NOT_SET = object()
+
+
+def peek_and_iter(items):
+ if not items:
+ return None, None
+ items = iter(items)
+ try:
+ peeked = next(items)
+ except StopIteration:
+ return None, None
+ def chain():
+ yield peeked
+ yield from items
+ return chain(), peeked
+
+
+def iter_many(items, onempty=None):
+ if not items:
+ if onempty is None:
+ return
+ if not callable(onempty):
+ raise onEmpty
+ items = onempty(items)
+ yield from iter_many(items, onempty=None)
+ return
+ items = iter(items)
+ try:
+ first = next(items)
+ except StopIteration:
+ if onempty is None:
+ return
+ if not callable(onempty):
+ raise onEmpty
+ items = onempty(items)
+ yield from iter_many(items, onempty=None)
+ else:
+ try:
+ second = next(items)
+ except StopIteration:
+ yield first, False
+ return
+ else:
+ yield first, True
+ yield second, True
+ for item in items:
+ yield item, True
diff --git a/Tools/c-analyzer/c_common/logging.py b/Tools/c-analyzer/c_common/logging.py
new file mode 100644
index 0000000..12398f7
--- /dev/null
+++ b/Tools/c-analyzer/c_common/logging.py
@@ -0,0 +1,63 @@
+import logging
+import sys
+
+
+VERBOSITY = 3
+
+
+# The root logger for the whole top-level package:
+_logger = logging.getLogger(__name__.rpartition('.')[0])
+
+
+def configure_logger(logger, verbosity=VERBOSITY, *,
+ logfile=None,
+ maxlevel=logging.CRITICAL,
+ ):
+ level = max(1, # 0 disables it, so we use the next lowest.
+ min(maxlevel,
+ maxlevel - verbosity * 10))
+ logger.setLevel(level)
+ #logger.propagate = False
+
+ if not logger.handlers:
+ if logfile:
+ handler = logging.FileHandler(logfile)
+ else:
+ handler = logging.StreamHandler(sys.stdout)
+ handler.setLevel(level)
+ #handler.setFormatter(logging.Formatter())
+ logger.addHandler(handler)
+
+ # In case the provided logger is in a sub-package...
+ if logger is not _logger:
+ configure_logger(
+ _logger,
+ verbosity,
+ logfile=logfile,
+ maxlevel=maxlevel,
+ )
+
+
+def hide_emit_errors():
+ """Ignore errors while emitting log entries.
+
+ Rather than printing a message desribing the error, we show nothing.
+ """
+ # For now we simply ignore all exceptions. If we wanted to ignore
+ # specific ones (e.g. BrokenPipeError) then we would need to use
+ # a Handler subclass with a custom handleError() method.
+ orig = logging.raiseExceptions
+ logging.raiseExceptions = False
+ def restore():
+ logging.raiseExceptions = orig
+ return restore
+
+
+class Printer:
+ def __init__(self, verbosity=VERBOSITY):
+ self.verbosity = verbosity
+
+ def info(self, *args, **kwargs):
+ if self.verbosity < 3:
+ return
+ print(*args, **kwargs)
diff --git a/Tools/c-analyzer/c_common/misc.py b/Tools/c-analyzer/c_common/misc.py
new file mode 100644
index 0000000..bfd503a
--- /dev/null
+++ b/Tools/c-analyzer/c_common/misc.py
@@ -0,0 +1,7 @@
+
+class Labeled:
+ __slots__ = ('_label',)
+ def __init__(self, label):
+ self._label = label
+ def __repr__(self):
+ return f'<{self._label}>'
diff --git a/Tools/c-analyzer/c_common/scriptutil.py b/Tools/c-analyzer/c_common/scriptutil.py
new file mode 100644
index 0000000..939a850
--- /dev/null
+++ b/Tools/c-analyzer/c_common/scriptutil.py
@@ -0,0 +1,577 @@
+import argparse
+import contextlib
+import fnmatch
+import logging
+import os
+import os.path
+import shutil
+import sys
+
+from . import fsutil, strutil, iterutil, logging as loggingutil
+
+
+def get_prog(spec=None, *, absolute=False, allowsuffix=True):
+ if spec is None:
+ _, spec = _find_script()
+ # This is more natural for prog than __file__ would be.
+ filename = sys.argv[0]
+ elif isinstance(spec, str):
+ filename = os.path.normpath(spec)
+ spec = None
+ else:
+ filename = spec.origin
+ if _is_standalone(filename):
+ # Check if "installed".
+ if allowsuffix or not filename.endswith('.py'):
+ basename = os.path.basename(filename)
+ found = shutil.which(basename)
+ if found:
+ script = os.path.abspath(filename)
+ found = os.path.abspath(found)
+ if os.path.normcase(script) == os.path.normcase(found):
+ return basename
+ # It is only "standalone".
+ if absolute:
+ filename = os.path.abspath(filename)
+ return filename
+ elif spec is not None:
+ module = spec.name
+ if module.endswith('.__main__'):
+ module = module[:-9]
+ return f'{sys.executable} -m {module}'
+ else:
+ if absolute:
+ filename = os.path.abspath(filename)
+ return f'{sys.executable} {filename}'
+
+
+def _find_script():
+ frame = sys._getframe(2)
+ while frame.f_globals['__name__'] != '__main__':
+ frame = frame.f_back
+
+ # This should match sys.argv[0].
+ filename = frame.f_globals['__file__']
+ # This will be None if -m wasn't used..
+ spec = frame.f_globals['__spec__']
+ return filename, spec
+
+
+def is_installed(filename, *, allowsuffix=True):
+ if not allowsuffix and filename.endswith('.py'):
+ return False
+ filename = os.path.abspath(os.path.normalize(filename))
+ found = shutil.which(os.path.basename(filename))
+ if not found:
+ return False
+ if found != filename:
+ return False
+ return _is_standalone(filename)
+
+
+def is_standalone(filename):
+ filename = os.path.abspath(os.path.normalize(filename))
+ return _is_standalone(filename)
+
+
+def _is_standalone(filename):
+ return fsutil.is_executable(filename)
+
+
+##################################
+# logging
+
+VERBOSITY = 3
+
+TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
+TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
+
+
+logger = logging.getLogger(__name__)
+
+
+def configure_logger(verbosity, logger=None, **kwargs):
+ if logger is None:
+ # Configure the root logger.
+ logger = logging.getLogger()
+ loggingutil.configure_logger(logger, verbosity, **kwargs)
+
+
+##################################
+# selections
+
+class UnsupportedSelectionError(Exception):
+ def __init__(self, values, possible):
+ self.values = tuple(values)
+ self.possible = tuple(possible)
+ super().__init__(f'unsupported selections {self.unique}')
+
+ @property
+ def unique(self):
+ return tuple(sorted(set(self.values)))
+
+
+def normalize_selection(selected: str, *, possible=None):
+ if selected in (None, True, False):
+ return selected
+ elif isinstance(selected, str):
+ selected = [selected]
+ elif not selected:
+ return ()
+
+ unsupported = []
+ _selected = set()
+ for item in selected:
+ if not item:
+ continue
+ for value in item.strip().replace(',', ' ').split():
+ if not value:
+ continue
+ # XXX Handle subtraction (leading "-").
+ if possible and value not in possible and value != 'all':
+ unsupported.append(value)
+ _selected.add(value)
+ if unsupported:
+ raise UnsupportedSelectionError(unsupported, tuple(possible))
+ if 'all' in _selected:
+ return True
+ return frozenset(selected)
+
+
+##################################
+# CLI parsing helpers
+
+class CLIArgSpec(tuple):
+ def __new__(cls, *args, **kwargs):
+ return super().__new__(cls, (args, kwargs))
+
+ def __repr__(self):
+ args, kwargs = self
+ args = [repr(arg) for arg in args]
+ for name, value in kwargs.items():
+ args.append(f'{name}={value!r}')
+ return f'{type(self).__name__}({", ".join(args)})'
+
+ def __call__(self, parser, *, _noop=(lambda a: None)):
+ self.apply(parser)
+ return _noop
+
+ def apply(self, parser):
+ args, kwargs = self
+ parser.add_argument(*args, **kwargs)
+
+
+def apply_cli_argspecs(parser, specs):
+ processors = []
+ for spec in specs:
+ if callable(spec):
+ procs = spec(parser)
+ _add_procs(processors, procs)
+ else:
+ args, kwargs = spec
+ parser.add_argument(args, kwargs)
+ return processors
+
+
+def _add_procs(flattened, procs):
+ # XXX Fail on non-empty, non-callable procs?
+ if not procs:
+ return
+ if callable(procs):
+ flattened.append(procs)
+ else:
+ #processors.extend(p for p in procs if callable(p))
+ for proc in procs:
+ _add_procs(flattened, proc)
+
+
+def add_verbosity_cli(parser):
+ parser.add_argument('-q', '--quiet', action='count', default=0)
+ parser.add_argument('-v', '--verbose', action='count', default=0)
+
+ def process_args(args):
+ ns = vars(args)
+ key = 'verbosity'
+ if key in ns:
+ parser.error(f'duplicate arg {key!r}')
+ ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
+ return key
+ return process_args
+
+
+def add_traceback_cli(parser):
+ parser.add_argument('--traceback', '--tb', action='store_true',
+ default=TRACEBACK)
+ parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
+ action='store_const', const=False)
+
+ def process_args(args):
+ ns = vars(args)
+ key = 'traceback_cm'
+ if key in ns:
+ parser.error(f'duplicate arg {key!r}')
+ showtb = ns.pop('traceback')
+
+ @contextlib.contextmanager
+ def traceback_cm():
+ restore = loggingutil.hide_emit_errors()
+ try:
+ yield
+ except BrokenPipeError:
+ # It was piped to "head" or something similar.
+ pass
+ except NotImplementedError:
+ raise # re-raise
+ except Exception as exc:
+ if not showtb:
+ sys.exit(f'ERROR: {exc}')
+ raise # re-raise
+ except KeyboardInterrupt:
+ if not showtb:
+ sys.exit('\nINTERRUPTED')
+ raise # re-raise
+ except BaseException as exc:
+ if not showtb:
+ sys.exit(f'{type(exc).__name__}: {exc}')
+ raise # re-raise
+ finally:
+ restore()
+ ns[key] = traceback_cm()
+ return key
+ return process_args
+
+
+def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
+# if opt is True:
+# parser.add_argument(f'--{dest}', action='append', **kwargs)
+# elif isinstance(opt, str) and opt.startswith('-'):
+# parser.add_argument(opt, dest=dest, action='append', **kwargs)
+# else:
+# arg = dest if not opt else opt
+# kwargs.setdefault('nargs', '+')
+# parser.add_argument(arg, dest=dest, action='append', **kwargs)
+ if not isinstance(opt, str):
+ parser.error(f'opt must be a string, got {opt!r}')
+ elif opt.startswith('-'):
+ parser.add_argument(opt, dest=dest, action='append', **kwargs)
+ else:
+ kwargs.setdefault('nargs', '+')
+ #kwargs.setdefault('metavar', opt.upper())
+ parser.add_argument(opt, dest=dest, action='append', **kwargs)
+
+ def process_args(args):
+ ns = vars(args)
+
+ # XXX Use normalize_selection()?
+ if isinstance(ns[dest], str):
+ ns[dest] = [ns[dest]]
+ selections = []
+ for many in ns[dest] or ():
+ for value in many.split(sep):
+ if value not in choices:
+ parser.error(f'unknown {dest} {value!r}')
+ selections.append(value)
+ ns[dest] = selections
+ return process_args
+
+
+def add_files_cli(parser, *, excluded=None, nargs=None):
+ process_files = add_file_filtering_cli(parser, excluded=excluded)
+ parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
+ return [
+ process_files,
+ ]
+
+
+def add_file_filtering_cli(parser, *, excluded=None):
+ parser.add_argument('--start')
+ parser.add_argument('--include', action='append')
+ parser.add_argument('--exclude', action='append')
+
+ excluded = tuple(excluded or ())
+
+ def process_args(args):
+ ns = vars(args)
+ key = 'iter_filenames'
+ if key in ns:
+ parser.error(f'duplicate arg {key!r}')
+
+ _include = tuple(ns.pop('include') or ())
+ _exclude = excluded + tuple(ns.pop('exclude') or ())
+ kwargs = dict(
+ start=ns.pop('start'),
+ include=tuple(_parse_files(_include)),
+ exclude=tuple(_parse_files(_exclude)),
+ # We use the default for "show_header"
+ )
+ ns[key] = (lambda files: fsutil.iter_filenames(files, **kwargs))
+ return process_args
+
+
+def _parse_files(filenames):
+ for filename, _ in strutil.parse_entries(filenames):
+ yield filename.strip()
+
+
+def add_failure_filtering_cli(parser, pool, *, default=False):
+ parser.add_argument('--fail', action='append',
+ metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
+ parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
+
+ def process_args(args):
+ ns = vars(args)
+
+ fail = ns.pop('fail')
+ try:
+ fail = normalize_selection(fail, possible=pool)
+ except UnsupportedSelectionError as exc:
+ parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
+ else:
+ if fail is None:
+ fail = default
+
+ if fail is True:
+ def ignore_exc(_exc):
+ return False
+ elif fail is False:
+ def ignore_exc(_exc):
+ return True
+ else:
+ def ignore_exc(exc):
+ for err in fail:
+ if type(exc) == pool[err]:
+ return False
+ else:
+ return True
+ args.ignore_exc = ignore_exc
+ return process_args
+
+
+def add_kind_filtering_cli(parser, *, default=None):
+ parser.add_argument('--kinds', action='append')
+
+ def process_args(args):
+ ns = vars(args)
+
+ kinds = []
+ for kind in ns.pop('kinds') or default or ():
+ kinds.extend(kind.strip().replace(',', ' ').split())
+
+ if not kinds:
+ match_kind = (lambda k: True)
+ else:
+ included = set()
+ excluded = set()
+ for kind in kinds:
+ if kind.startswith('-'):
+ kind = kind[1:]
+ excluded.add(kind)
+ if kind in included:
+ included.remove(kind)
+ else:
+ included.add(kind)
+ if kind in excluded:
+ excluded.remove(kind)
+ if excluded:
+ if included:
+ ... # XXX fail?
+ def match_kind(kind, *, _excluded=excluded):
+ return kind not in _excluded
+ else:
+ def match_kind(kind, *, _included=included):
+ return kind in _included
+ args.match_kind = match_kind
+ return process_args
+
+
+COMMON_CLI = [
+ add_verbosity_cli,
+ add_traceback_cli,
+ #add_dryrun_cli,
+]
+
+
+def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
+ arg_processors = {}
+ if isinstance(subset, str):
+ cmdname = subset
+ try:
+ _, argspecs, _ = commands[cmdname]
+ except KeyError:
+ raise ValueError(f'unsupported subset {subset!r}')
+ parser.set_defaults(cmd=cmdname)
+ arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
+ else:
+ if subset is None:
+ cmdnames = subset = list(commands)
+ elif not subset:
+ raise NotImplementedError
+ elif isinstance(subset, set):
+ cmdnames = [k for k in commands if k in subset]
+ subset = sorted(subset)
+ else:
+ cmdnames = [n for n in subset if n in commands]
+ if len(cmdnames) < len(subset):
+ bad = tuple(n for n in subset if n not in commands)
+ raise ValueError(f'unsupported subset {bad}')
+
+ common = argparse.ArgumentParser(add_help=False)
+ common_processors = apply_cli_argspecs(common, commonspecs)
+ subs = parser.add_subparsers(dest='cmd')
+ for cmdname in cmdnames:
+ description, argspecs, _ = commands[cmdname]
+ sub = subs.add_parser(
+ cmdname,
+ description=description,
+ parents=[common],
+ )
+ cmd_processors = _add_cmd_cli(sub, (), argspecs)
+ arg_processors[cmdname] = common_processors + cmd_processors
+ return arg_processors
+
+
+def _add_cmd_cli(parser, commonspecs, argspecs):
+ processors = []
+ argspecs = list(commonspecs or ()) + list(argspecs or ())
+ for argspec in argspecs:
+ if callable(argspec):
+ procs = argspec(parser)
+ _add_procs(processors, procs)
+ else:
+ if not argspec:
+ raise NotImplementedError
+ args = list(argspec)
+ if not isinstance(args[-1], str):
+ kwargs = args.pop()
+ if not isinstance(args[0], str):
+ try:
+ args, = args
+ except (TypeError, ValueError):
+ parser.error(f'invalid cmd args {argspec!r}')
+ else:
+ kwargs = {}
+ parser.add_argument(*args, **kwargs)
+ # There will be nothing to process.
+ return processors
+
+
+def _flatten_processors(processors):
+ for proc in processors:
+ if proc is None:
+ continue
+ if callable(proc):
+ yield proc
+ else:
+ yield from _flatten_processors(proc)
+
+
+def process_args(args, processors, *, keys=None):
+ processors = _flatten_processors(processors)
+ ns = vars(args)
+ extracted = {}
+ if keys is None:
+ for process_args in processors:
+ for key in process_args(args):
+ extracted[key] = ns.pop(key)
+ else:
+ remainder = set(keys)
+ for process_args in processors:
+ hanging = process_args(args)
+ if isinstance(hanging, str):
+ hanging = [hanging]
+ for key in hanging or ():
+ if key not in remainder:
+ raise NotImplementedError(key)
+ extracted[key] = ns.pop(key)
+ remainder.remove(key)
+ if remainder:
+ raise NotImplementedError(sorted(remainder))
+ return extracted
+
+
+def process_args_by_key(args, processors, keys):
+ extracted = process_args(args, processors, keys=keys)
+ return [extracted[key] for key in keys]
+
+
+##################################
+# commands
+
+def set_command(name, add_cli):
+ """A decorator factory to set CLI info."""
+ def decorator(func):
+ if hasattr(func, '__cli__'):
+ raise Exception(f'already set')
+ func.__cli__ = (name, add_cli)
+ return func
+ return decorator
+
+
+##################################
+# main() helpers
+
+def filter_filenames(filenames, iter_filenames=None):
+ for filename, check, _ in _iter_filenames(filenames, iter_filenames):
+ if (reason := check()):
+ logger.debug(f'{filename}: {reason}')
+ continue
+ yield filename
+
+
+def main_for_filenames(filenames, iter_filenames=None):
+ for filename, check, show in _iter_filenames(filenames, iter_filenames):
+ if show:
+ print()
+ print('-------------------------------------------')
+ print(filename)
+ if (reason := check()):
+ print(reason)
+ continue
+ yield filename
+
+
+def _iter_filenames(filenames, iter_files):
+ if iter_files is None:
+ iter_files = fsutil.iter_filenames
+ yield from iter_files(filenames)
+ return
+
+ onempty = Exception('no filenames provided')
+ items = iter_files(filenames)
+ items, peeked = iterutil.peek_and_iter(items)
+ if not items:
+ raise onempty
+ if isinstance(peeked, str):
+ check = (lambda: True)
+ for filename, ismany in iterutil.iter_many(items, onempty):
+ yield filename, check, ismany
+ elif len(peeked) == 3:
+ yield from items
+ else:
+ raise NotImplementedError
+
+
+def iter_marks(mark='.', *, group=5, groups=2, lines=10, sep=' '):
+ mark = mark or ''
+ sep = f'{mark}{sep}' if sep else mark
+ end = f'{mark}{os.linesep}'
+ div = os.linesep
+ perline = group * groups
+ perlines = perline * lines
+
+ if perline == 1:
+ yield end
+ elif group == 1:
+ yield sep
+
+ count = 1
+ while True:
+ if count % perline == 0:
+ yield end
+ if count % perlines == 0:
+ yield div
+ elif count % group == 0:
+ yield sep
+ else:
+ yield mark
+ count += 1
diff --git a/Tools/c-analyzer/c_common/show.py b/Tools/c-analyzer/c_common/show.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Tools/c-analyzer/c_common/show.py
diff --git a/Tools/c-analyzer/c_common/strutil.py b/Tools/c-analyzer/c_common/strutil.py
new file mode 100644
index 0000000..e7535d4
--- /dev/null
+++ b/Tools/c-analyzer/c_common/strutil.py
@@ -0,0 +1,42 @@
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+def unrepr(value):
+ raise NotImplementedError
+
+
+def parse_entries(entries, *, ignoresep=None):
+ for entry in entries:
+ if ignoresep and ignoresep in entry:
+ subentries = [entry]
+ else:
+ subentries = entry.strip().replace(',', ' ').split()
+ for item in subentries:
+ if item.startswith('+'):
+ filename = item[1:]
+ try:
+ infile = open(filename)
+ except FileNotFoundError:
+ logger.debug(f'ignored in parse_entries(): +{filename}')
+ return
+ with infile:
+ # We read the entire file here to ensure the file
+ # gets closed sooner rather than later. Note that
+ # the file would stay open if this iterator is never
+ # exchausted.
+ lines = infile.read().splitlines()
+ for line in _iter_significant_lines(lines):
+ yield line, filename
+ else:
+ yield item, None
+
+
+def _iter_significant_lines(lines):
+ for line in lines:
+ line = line.partition('#')[0]
+ if not line.strip():
+ continue
+ yield line
diff --git a/Tools/c-analyzer/c_common/tables.py b/Tools/c-analyzer/c_common/tables.py
new file mode 100644
index 0000000..70a230a
--- /dev/null
+++ b/Tools/c-analyzer/c_common/tables.py
@@ -0,0 +1,213 @@
+import csv
+
+from . import NOT_SET, strutil, fsutil
+
+
+EMPTY = '-'
+UNKNOWN = '???'
+
+
+def parse_markers(markers, default=None):
+ if markers is NOT_SET:
+ return default
+ if not markers:
+ return None
+ if type(markers) is not str:
+ return markers
+ if markers == markers[0] * len(markers):
+ return [markers]
+ return list(markers)
+
+
+def fix_row(row, **markers):
+ if isinstance(row, str):
+ raise NotImplementedError(row)
+ empty = parse_markers(markers.pop('empty', ('-',)))
+ unknown = parse_markers(markers.pop('unknown', ('???',)))
+ row = (val if val else None for val in row)
+ if not empty:
+ if not unknown:
+ return row
+ return (UNKNOWN if val in unknown else val for val in row)
+ elif not unknown:
+ return (EMPTY if val in empty else val for val in row)
+ return (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
+ for val in row)
+
+
+def _fix_read_default(row):
+ for value in row:
+ yield value.strip()
+
+
+def _fix_write_default(row, empty=''):
+ for value in row:
+ yield empty if value is None else str(value)
+
+
+def _normalize_fix_read(fix):
+ if fix is None:
+ fix = ''
+ if callable(fix):
+ def fix_row(row):
+ values = fix(row)
+ return _fix_read_default(values)
+ elif isinstance(fix, str):
+ def fix_row(row):
+ values = _fix_read_default(row)
+ return (None if v == fix else v
+ for v in values)
+ else:
+ raise NotImplementedError(fix)
+ return fix_row
+
+
+def _normalize_fix_write(fix, empty=''):
+ if fix is None:
+ fix = empty
+ if callable(fix):
+ def fix_row(row):
+ values = fix(row)
+ return _fix_write_default(values, empty)
+ elif isinstance(fix, str):
+ def fix_row(row):
+ return _fix_write_default(row, fix)
+ else:
+ raise NotImplementedError(fix)
+ return fix_row
+
+
+def read_table(infile, header, *,
+ sep='\t',
+ fix=None,
+ _open=open,
+ _get_reader=csv.reader,
+ ):
+ """Yield each row of the given ???-separated (e.g. tab) file."""
+ if isinstance(infile, str):
+ with _open(infile, newline='') as infile:
+ yield from read_table(
+ infile,
+ header,
+ sep=sep,
+ fix=fix,
+ _open=_open,
+ _get_reader=_get_reader,
+ )
+ return
+ lines = strutil._iter_significant_lines(infile)
+
+ # Validate the header.
+ if not isinstance(header, str):
+ header = sep.join(header)
+ try:
+ actualheader = next(lines).strip()
+ except StopIteration:
+ actualheader = ''
+ if actualheader != header:
+ raise ValueError(f'bad header {actualheader!r}')
+
+ fix_row = _normalize_fix_read(fix)
+ for row in _get_reader(lines, delimiter=sep or '\t'):
+ yield tuple(fix_row(row))
+
+
+def write_table(outfile, header, rows, *,
+ sep='\t',
+ fix=None,
+ backup=True,
+ _open=open,
+ _get_writer=csv.writer,
+ ):
+ """Write each of the rows to the given ???-separated (e.g. tab) file."""
+ if backup:
+ fsutil.create_backup(outfile, backup)
+ if isinstance(outfile, str):
+ with _open(outfile, 'w', newline='') as outfile:
+ return write_table(
+ outfile,
+ header,
+ rows,
+ sep=sep,
+ fix=fix,
+ backup=backup,
+ _open=_open,
+ _get_writer=_get_writer,
+ )
+
+ if isinstance(header, str):
+ header = header.split(sep or '\t')
+ fix_row = _normalize_fix_write(fix)
+ writer = _get_writer(outfile, delimiter=sep or '\t')
+ writer.writerow(header)
+ for row in rows:
+ writer.writerow(
+ tuple(fix_row(row))
+ )
+
+
+def parse_table(entries, sep, header=None, rawsep=None, *,
+ default=NOT_SET,
+ strict=True,
+ ):
+ header, sep = _normalize_table_file_props(header, sep)
+ if not sep:
+ raise ValueError('missing "sep"')
+
+ ncols = None
+ if header:
+ if strict:
+ ncols = len(header.split(sep))
+ cur_file = None
+ for line, filename in strutil.parse_entries(entries, ignoresep=sep):
+ _sep = sep
+ if filename:
+ if header and cur_file != filename:
+ cur_file = filename
+ # Skip the first line if it's the header.
+ if line.strip() == header:
+ continue
+ else:
+ # We expected the header.
+ raise NotImplementedError((header, line))
+ elif rawsep and sep not in line:
+ _sep = rawsep
+
+ row = _parse_row(line, _sep, ncols, default)
+ if strict and not ncols:
+ ncols = len(row)
+ yield row, filename
+
+
+def parse_row(line, sep, *, ncols=None, default=NOT_SET):
+ if not sep:
+ raise ValueError('missing "sep"')
+ return _parse_row(line, sep, ncols, default)
+
+
+def _parse_row(line, sep, ncols, default):
+ row = tuple(v.strip() for v in line.split(sep))
+ if (ncols or 0) > 0:
+ diff = ncols - len(row)
+ if diff:
+ if default is NOT_SET or diff < 0:
+ raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
+ row += (default,) * diff
+ return row
+
+
+def _normalize_table_file_props(header, sep):
+ if not header:
+ return None, sep
+
+ if not isinstance(header, str):
+ if not sep:
+ raise NotImplementedError(header)
+ header = sep.join(header)
+ elif not sep:
+ for sep in ('\t', ',', ' '):
+ if sep in header:
+ break
+ else:
+ sep = None
+ return header, sep