summaryrefslogtreecommitdiffstats
path: root/Tools/c-analyzer/c_common/fsutil.py
blob: a8cf8d0537e40db48e0e7f34f61a378c492867a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
import fnmatch
import glob
import os
import os.path
import shutil
import stat

from .iterutil import iter_many


USE_CWD = object()


C_SOURCE_SUFFIXES = ('.c', '.h')


def create_backup(old, backup=None):
    if isinstance(old, str):
        filename = old
    else:
        filename = getattr(old, 'name', None)
    if not filename:
        return None
    if not backup or backup is True:
        backup = f'{filename}.bak'
    try:
        shutil.copyfile(filename, backup)
    except FileNotFoundError as exc:
        if exc.filename != filename:
            raise   # re-raise
        backup = None
    return backup


##################################
# filenames

def fix_filename(filename, relroot=USE_CWD, *,
                 fixroot=True,
                 _badprefix=f'..{os.path.sep}',
                 ):
    """Return a normalized, absolute-path copy of the given filename."""
    if not relroot or relroot is USE_CWD:
        return os.path.abspath(filename)
    if fixroot:
        relroot = os.path.abspath(relroot)
    return _fix_filename(filename, relroot)


def _fix_filename(filename, relroot, *,
                  _badprefix=f'..{os.path.sep}',
                  ):
    orig = filename

    # First we normalize.
    filename = os.path.normpath(filename)
    if filename.startswith(_badprefix):
        raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')

    # Now make sure it is absolute (relative to relroot).
    if not os.path.isabs(filename):
        filename = os.path.join(relroot, filename)
    else:
        relpath = os.path.relpath(filename, relroot)
        if os.path.join(relroot, relpath) != filename:
            raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}')

    return filename


def fix_filenames(filenames, relroot=USE_CWD):
    if not relroot or relroot is USE_CWD:
        filenames = (os.path.abspath(v) for v in filenames)
    else:
        relroot = os.path.abspath(relroot)
        filenames = (_fix_filename(v, relroot) for v in filenames)
    return filenames, relroot


def format_filename(filename, relroot=USE_CWD, *,
                    fixroot=True,
                    normalize=True,
                    _badprefix=f'..{os.path.sep}',
                    ):
    """Return a consistent relative-path representation of the filename."""
    orig = filename
    if normalize:
        filename = os.path.normpath(filename)
    if relroot is None:
        # Otherwise leave it as-is.
        return filename
    elif relroot is USE_CWD:
        # Make it relative to CWD.
        filename = os.path.relpath(filename)
    else:
        # Make it relative to "relroot".
        if fixroot:
            relroot = os.path.abspath(relroot)
        elif not relroot:
            raise ValueError('missing relroot')
        filename = os.path.relpath(filename, relroot)
    if filename.startswith(_badprefix):
        raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
    return filename


def match_path_tail(path1, path2):
    """Return True if one path ends the other."""
    if path1 == path2:
        return True
    if os.path.isabs(path1):
        if os.path.isabs(path2):
            return False
        return _match_tail(path1, path2)
    elif os.path.isabs(path2):
        return _match_tail(path2, path1)
    else:
        return _match_tail(path1, path2) or _match_tail(path2, path1)


def _match_tail(path, tail):
    assert not os.path.isabs(tail), repr(tail)
    return path.endswith(os.path.sep + tail)


##################################
# find files

def match_glob(filename, pattern):
    if fnmatch.fnmatch(filename, pattern):
        return True

    # fnmatch doesn't handle ** quite right.  It will not match the
    # following:
    #
    #  ('x/spam.py', 'x/**/*.py')
    #  ('spam.py', '**/*.py')
    #
    # though it *will* match the following:
    #
    #  ('x/y/spam.py', 'x/**/*.py')
    #  ('x/spam.py', '**/*.py')

    if '**/' not in pattern:
        return False

    # We only accommodate the single-"**" case.
    return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))


def process_filenames(filenames, *,
                      start=None,
                      include=None,
                      exclude=None,
                      relroot=USE_CWD,
                      ):
    if relroot and relroot is not USE_CWD:
        relroot = os.path.abspath(relroot)
    if start:
        start = fix_filename(start, relroot, fixroot=False)
    if include:
        include = set(fix_filename(v, relroot, fixroot=False)
                      for v in include)
    if exclude:
        exclude = set(fix_filename(v, relroot, fixroot=False)
                      for v in exclude)

    onempty = Exception('no filenames provided')
    for filename, solo in iter_many(filenames, onempty):
        filename = fix_filename(filename, relroot, fixroot=False)
        relfile = format_filename(filename, relroot, fixroot=False, normalize=False)
        check, start = _get_check(filename, start, include, exclude)
        yield filename, relfile, check, solo


def expand_filenames(filenames):
    for filename in filenames:
        # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
        if '**/' in filename:
            yield from glob.glob(filename.replace('**/', ''))
        yield from glob.glob(filename)


def _get_check(filename, start, include, exclude):
    if start and filename != start:
        return (lambda: '<skipped>'), start
    else:
        def check():
            if _is_excluded(filename, exclude, include):
                return '<excluded>'
            return None
        return check, None


def _is_excluded(filename, exclude, include):
    if include:
        for included in include:
            if match_glob(filename, included):
                return False
        return True
    elif exclude:
        for excluded in exclude:
            if match_glob(filename, excluded):
                return True
        return False
    else:
        return False


def _walk_tree(root, *,
               _walk=os.walk,
               ):
    # A wrapper around os.walk that resolves the filenames.
    for parent, _, names in _walk(root):
        for name in names:
            yield os.path.join(parent, name)


def walk_tree(root, *,
              suffix=None,
              walk=_walk_tree,
              ):
    """Yield each file in the tree under the given directory name.

    If "suffix" is provided then only files with that suffix will
    be included.
    """
    if suffix and not isinstance(suffix, str):
        raise ValueError('suffix must be a string')

    for filename in walk(root):
        if suffix and not filename.endswith(suffix):
            continue
        yield filename


def glob_tree(root, *,
              suffix=None,
              _glob=glob.iglob,
              ):
    """Yield each file in the tree under the given directory name.

    If "suffix" is provided then only files with that suffix will
    be included.
    """
    suffix = suffix or ''
    if not isinstance(suffix, str):
        raise ValueError('suffix must be a string')

    for filename in _glob(f'{root}/*{suffix}'):
        yield filename
    for filename in _glob(f'{root}/**/*{suffix}'):
        yield filename


def iter_files(root, suffix=None, relparent=None, *,
               get_files=os.walk,
               _glob=glob_tree,
               _walk=walk_tree,
               ):
    """Yield each file in the tree under the given directory name.

    If "root" is a non-string iterable then do the same for each of
    those trees.

    If "suffix" is provided then only files with that suffix will
    be included.

    if "relparent" is provided then it is used to resolve each
    filename as a relative path.
    """
    if not isinstance(root, str):
        roots = root
        for root in roots:
            yield from iter_files(root, suffix, relparent,
                                  get_files=get_files,
                                  _glob=_glob, _walk=_walk)
        return

    # Use the right "walk" function.
    if get_files in (glob.glob, glob.iglob, glob_tree):
        get_files = _glob
    else:
        _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
        get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))

    # Handle a single suffix.
    if suffix and not isinstance(suffix, str):
        filenames = get_files(root)
        suffix = tuple(suffix)
    else:
        filenames = get_files(root, suffix=suffix)
        suffix = None

    for filename in filenames:
        if suffix and not isinstance(suffix, str):  # multiple suffixes
            if not filename.endswith(suffix):
                continue
        if relparent:
            filename = os.path.relpath(filename, relparent)
        yield filename


def iter_files_by_suffix(root, suffixes, relparent=None, *,
                         walk=walk_tree,
                         _iter_files=iter_files,
                         ):
    """Yield each file in the tree that has the given suffixes.

    Unlike iter_files(), the results are in the original suffix order.
    """
    if isinstance(suffixes, str):
        suffixes = [suffixes]
    # XXX Ignore repeated suffixes?
    for suffix in suffixes:
        yield from _iter_files(root, suffix, relparent)


##################################
# file info

# XXX posix-only?

S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH


def is_readable(file, *, user=None, check=False):
    filename, st, mode = _get_file_info(file)
    if check:
        try:
            okay = _check_file(filename, S_IRANY)
        except NotImplementedError:
            okay = NotImplemented
        if okay is not NotImplemented:
            return okay
        # Fall back to checking the mode.
    return _check_mode(st, mode, S_IRANY, user)


def is_writable(file, *, user=None, check=False):
    filename, st, mode = _get_file_info(file)
    if check:
        try:
            okay = _check_file(filename, S_IWANY)
        except NotImplementedError:
            okay = NotImplemented
        if okay is not NotImplemented:
            return okay
        # Fall back to checking the mode.
    return _check_mode(st, mode, S_IWANY, user)


def is_executable(file, *, user=None, check=False):
    filename, st, mode = _get_file_info(file)
    if check:
        try:
            okay = _check_file(filename, S_IXANY)
        except NotImplementedError:
            okay = NotImplemented
        if okay is not NotImplemented:
            return okay
        # Fall back to checking the mode.
    return _check_mode(st, mode, S_IXANY, user)


def _get_file_info(file):
    filename = st = mode = None
    if isinstance(file, int):
        mode = file
    elif isinstance(file, os.stat_result):
        st = file
    else:
        if isinstance(file, str):
            filename = file
        elif hasattr(file, 'name') and os.path.exists(file.name):
            filename = file.name
        else:
            raise NotImplementedError(file)
        st = os.stat(filename)
    return filename, st, mode or st.st_mode


def _check_file(filename, check):
    if not isinstance(filename, str):
        raise Exception(f'filename required to check file, got {filename}')
    if check & S_IRANY:
        flags = os.O_RDONLY
    elif check & S_IWANY:
        flags = os.O_WRONLY
    elif check & S_IXANY:
        # We can worry about S_IXANY later
        return NotImplemented
    else:
        raise NotImplementedError(check)

    try:
        fd = os.open(filename, flags)
    except PermissionError:
        return False
    # We do not ignore other exceptions.
    else:
        os.close(fd)
        return True


def _get_user_info(user):
    import pwd
    username = uid = gid = groups = None
    if user is None:
        uid = os.geteuid()
        #username = os.getlogin()
        username = pwd.getpwuid(uid)[0]
        gid = os.getgid()
        groups = os.getgroups()
    else:
        if isinstance(user, int):
            uid = user
            entry = pwd.getpwuid(uid)
            username = entry.pw_name
        elif isinstance(user, str):
            username = user
            entry = pwd.getpwnam(username)
            uid = entry.pw_uid
        else:
            raise NotImplementedError(user)
        gid = entry.pw_gid
        os.getgrouplist(username, gid)
    return username, uid, gid, groups


def _check_mode(st, mode, check, user):
    orig = check
    _, uid, gid, groups = _get_user_info(user)
    if check & S_IRANY:
        check -= S_IRANY
        matched = False
        if mode & stat.S_IRUSR:
            if st.st_uid == uid:
                matched = True
        if mode & stat.S_IRGRP:
            if st.st_uid == gid or st.st_uid in groups:
                matched = True
        if mode & stat.S_IROTH:
            matched = True
        if not matched:
            return False
    if check & S_IWANY:
        check -= S_IWANY
        matched = False
        if mode & stat.S_IWUSR:
            if st.st_uid == uid:
                matched = True
        if mode & stat.S_IWGRP:
            if st.st_uid == gid or st.st_uid in groups:
                matched = True
        if mode & stat.S_IWOTH:
            matched = True
        if not matched:
            return False
    if check & S_IXANY:
        check -= S_IXANY
        matched = False
        if mode & stat.S_IXUSR:
            if st.st_uid == uid:
                matched = True
        if mode & stat.S_IXGRP:
            if st.st_uid == gid or st.st_uid in groups:
                matched = True
        if mode & stat.S_IXOTH:
            matched = True
        if not matched:
            return False
    if check:
        raise NotImplementedError((orig, check))
    return True