1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
|
import fnmatch
import glob
import os
import os.path
import shutil
import stat
from .iterutil import iter_many
USE_CWD = object()
C_SOURCE_SUFFIXES = ('.c', '.h')
def create_backup(old, backup=None):
if isinstance(old, str):
filename = old
else:
filename = getattr(old, 'name', None)
if not filename:
return None
if not backup or backup is True:
backup = f'{filename}.bak'
try:
shutil.copyfile(filename, backup)
except FileNotFoundError as exc:
if exc.filename != filename:
raise # re-raise
backup = None
return backup
##################################
# filenames
def fix_filename(filename, relroot=USE_CWD, *,
fixroot=True,
_badprefix=f'..{os.path.sep}',
):
"""Return a normalized, absolute-path copy of the given filename."""
if not relroot or relroot is USE_CWD:
return os.path.abspath(filename)
if fixroot:
relroot = os.path.abspath(relroot)
return _fix_filename(filename, relroot)
def _fix_filename(filename, relroot, *,
_badprefix=f'..{os.path.sep}',
):
orig = filename
# First we normalize.
filename = os.path.normpath(filename)
if filename.startswith(_badprefix):
raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
# Now make sure it is absolute (relative to relroot).
if not os.path.isabs(filename):
filename = os.path.join(relroot, filename)
else:
relpath = os.path.relpath(filename, relroot)
if os.path.join(relroot, relpath) != filename:
raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}')
return filename
def fix_filenames(filenames, relroot=USE_CWD):
if not relroot or relroot is USE_CWD:
filenames = (os.path.abspath(v) for v in filenames)
else:
relroot = os.path.abspath(relroot)
filenames = (_fix_filename(v, relroot) for v in filenames)
return filenames, relroot
def format_filename(filename, relroot=USE_CWD, *,
fixroot=True,
normalize=True,
_badprefix=f'..{os.path.sep}',
):
"""Return a consistent relative-path representation of the filename."""
orig = filename
if normalize:
filename = os.path.normpath(filename)
if relroot is None:
# Otherwise leave it as-is.
return filename
elif relroot is USE_CWD:
# Make it relative to CWD.
filename = os.path.relpath(filename)
else:
# Make it relative to "relroot".
if fixroot:
relroot = os.path.abspath(relroot)
elif not relroot:
raise ValueError('missing relroot')
filename = os.path.relpath(filename, relroot)
if filename.startswith(_badprefix):
raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
return filename
def match_path_tail(path1, path2):
"""Return True if one path ends the other."""
if path1 == path2:
return True
if os.path.isabs(path1):
if os.path.isabs(path2):
return False
return _match_tail(path1, path2)
elif os.path.isabs(path2):
return _match_tail(path2, path1)
else:
return _match_tail(path1, path2) or _match_tail(path2, path1)
def _match_tail(path, tail):
assert not os.path.isabs(tail), repr(tail)
return path.endswith(os.path.sep + tail)
##################################
# find files
def match_glob(filename, pattern):
if fnmatch.fnmatch(filename, pattern):
return True
# fnmatch doesn't handle ** quite right. It will not match the
# following:
#
# ('x/spam.py', 'x/**/*.py')
# ('spam.py', '**/*.py')
#
# though it *will* match the following:
#
# ('x/y/spam.py', 'x/**/*.py')
# ('x/spam.py', '**/*.py')
if '**/' not in pattern:
return False
# We only accommodate the single-"**" case.
return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))
def process_filenames(filenames, *,
start=None,
include=None,
exclude=None,
relroot=USE_CWD,
):
if relroot and relroot is not USE_CWD:
relroot = os.path.abspath(relroot)
if start:
start = fix_filename(start, relroot, fixroot=False)
if include:
include = set(fix_filename(v, relroot, fixroot=False)
for v in include)
if exclude:
exclude = set(fix_filename(v, relroot, fixroot=False)
for v in exclude)
onempty = Exception('no filenames provided')
for filename, solo in iter_many(filenames, onempty):
filename = fix_filename(filename, relroot, fixroot=False)
relfile = format_filename(filename, relroot, fixroot=False, normalize=False)
check, start = _get_check(filename, start, include, exclude)
yield filename, relfile, check, solo
def expand_filenames(filenames):
for filename in filenames:
# XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
if '**/' in filename:
yield from glob.glob(filename.replace('**/', ''))
yield from glob.glob(filename)
def _get_check(filename, start, include, exclude):
if start and filename != start:
return (lambda: '<skipped>'), start
else:
def check():
if _is_excluded(filename, exclude, include):
return '<excluded>'
return None
return check, None
def _is_excluded(filename, exclude, include):
if include:
for included in include:
if match_glob(filename, included):
return False
return True
elif exclude:
for excluded in exclude:
if match_glob(filename, excluded):
return True
return False
else:
return False
def _walk_tree(root, *,
_walk=os.walk,
):
# A wrapper around os.walk that resolves the filenames.
for parent, _, names in _walk(root):
for name in names:
yield os.path.join(parent, name)
def walk_tree(root, *,
suffix=None,
walk=_walk_tree,
):
"""Yield each file in the tree under the given directory name.
If "suffix" is provided then only files with that suffix will
be included.
"""
if suffix and not isinstance(suffix, str):
raise ValueError('suffix must be a string')
for filename in walk(root):
if suffix and not filename.endswith(suffix):
continue
yield filename
def glob_tree(root, *,
suffix=None,
_glob=glob.iglob,
):
"""Yield each file in the tree under the given directory name.
If "suffix" is provided then only files with that suffix will
be included.
"""
suffix = suffix or ''
if not isinstance(suffix, str):
raise ValueError('suffix must be a string')
for filename in _glob(f'{root}/*{suffix}'):
yield filename
for filename in _glob(f'{root}/**/*{suffix}'):
yield filename
def iter_files(root, suffix=None, relparent=None, *,
get_files=os.walk,
_glob=glob_tree,
_walk=walk_tree,
):
"""Yield each file in the tree under the given directory name.
If "root" is a non-string iterable then do the same for each of
those trees.
If "suffix" is provided then only files with that suffix will
be included.
if "relparent" is provided then it is used to resolve each
filename as a relative path.
"""
if not isinstance(root, str):
roots = root
for root in roots:
yield from iter_files(root, suffix, relparent,
get_files=get_files,
_glob=_glob, _walk=_walk)
return
# Use the right "walk" function.
if get_files in (glob.glob, glob.iglob, glob_tree):
get_files = _glob
else:
_files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))
# Handle a single suffix.
if suffix and not isinstance(suffix, str):
filenames = get_files(root)
suffix = tuple(suffix)
else:
filenames = get_files(root, suffix=suffix)
suffix = None
for filename in filenames:
if suffix and not isinstance(suffix, str): # multiple suffixes
if not filename.endswith(suffix):
continue
if relparent:
filename = os.path.relpath(filename, relparent)
yield filename
def iter_files_by_suffix(root, suffixes, relparent=None, *,
walk=walk_tree,
_iter_files=iter_files,
):
"""Yield each file in the tree that has the given suffixes.
Unlike iter_files(), the results are in the original suffix order.
"""
if isinstance(suffixes, str):
suffixes = [suffixes]
# XXX Ignore repeated suffixes?
for suffix in suffixes:
yield from _iter_files(root, suffix, relparent)
##################################
# file info
# XXX posix-only?
S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
def is_readable(file, *, user=None, check=False):
filename, st, mode = _get_file_info(file)
if check:
try:
okay = _check_file(filename, S_IRANY)
except NotImplementedError:
okay = NotImplemented
if okay is not NotImplemented:
return okay
# Fall back to checking the mode.
return _check_mode(st, mode, S_IRANY, user)
def is_writable(file, *, user=None, check=False):
filename, st, mode = _get_file_info(file)
if check:
try:
okay = _check_file(filename, S_IWANY)
except NotImplementedError:
okay = NotImplemented
if okay is not NotImplemented:
return okay
# Fall back to checking the mode.
return _check_mode(st, mode, S_IWANY, user)
def is_executable(file, *, user=None, check=False):
filename, st, mode = _get_file_info(file)
if check:
try:
okay = _check_file(filename, S_IXANY)
except NotImplementedError:
okay = NotImplemented
if okay is not NotImplemented:
return okay
# Fall back to checking the mode.
return _check_mode(st, mode, S_IXANY, user)
def _get_file_info(file):
filename = st = mode = None
if isinstance(file, int):
mode = file
elif isinstance(file, os.stat_result):
st = file
else:
if isinstance(file, str):
filename = file
elif hasattr(file, 'name') and os.path.exists(file.name):
filename = file.name
else:
raise NotImplementedError(file)
st = os.stat(filename)
return filename, st, mode or st.st_mode
def _check_file(filename, check):
if not isinstance(filename, str):
raise Exception(f'filename required to check file, got {filename}')
if check & S_IRANY:
flags = os.O_RDONLY
elif check & S_IWANY:
flags = os.O_WRONLY
elif check & S_IXANY:
# We can worry about S_IXANY later
return NotImplemented
else:
raise NotImplementedError(check)
try:
fd = os.open(filename, flags)
except PermissionError:
return False
# We do not ignore other exceptions.
else:
os.close(fd)
return True
def _get_user_info(user):
import pwd
username = uid = gid = groups = None
if user is None:
uid = os.geteuid()
#username = os.getlogin()
username = pwd.getpwuid(uid)[0]
gid = os.getgid()
groups = os.getgroups()
else:
if isinstance(user, int):
uid = user
entry = pwd.getpwuid(uid)
username = entry.pw_name
elif isinstance(user, str):
username = user
entry = pwd.getpwnam(username)
uid = entry.pw_uid
else:
raise NotImplementedError(user)
gid = entry.pw_gid
os.getgrouplist(username, gid)
return username, uid, gid, groups
def _check_mode(st, mode, check, user):
orig = check
_, uid, gid, groups = _get_user_info(user)
if check & S_IRANY:
check -= S_IRANY
matched = False
if mode & stat.S_IRUSR:
if st.st_uid == uid:
matched = True
if mode & stat.S_IRGRP:
if st.st_uid == gid or st.st_uid in groups:
matched = True
if mode & stat.S_IROTH:
matched = True
if not matched:
return False
if check & S_IWANY:
check -= S_IWANY
matched = False
if mode & stat.S_IWUSR:
if st.st_uid == uid:
matched = True
if mode & stat.S_IWGRP:
if st.st_uid == gid or st.st_uid in groups:
matched = True
if mode & stat.S_IWOTH:
matched = True
if not matched:
return False
if check & S_IXANY:
check -= S_IXANY
matched = False
if mode & stat.S_IXUSR:
if st.st_uid == uid:
matched = True
if mode & stat.S_IXGRP:
if st.st_uid == gid or st.st_uid in groups:
matched = True
if mode & stat.S_IXOTH:
matched = True
if not matched:
return False
if check:
raise NotImplementedError((orig, check))
return True
|