From a40f557d7b7a355a55bb90c068e3e9202fd9c8f2 Mon Sep 17 00:00:00 2001 From: Barney Gale Date: Fri, 3 May 2024 21:29:25 +0100 Subject: GH-116380: Move pathlib globbing implementation into `pathlib._glob` (#118562) Moving this code under the `pathlib` package makes it quite a lot easier to backport in the `pathlib-abc` PyPI package. It was a bit foolish of me to add it to `glob` in the first place. Also add `translate()` to `__all__` in `glob`. This function is new in 3.13, so there's no NEWS needed. --- Lib/glob.py | 306 +----------------------------------------------- Lib/pathlib/__init__.py | 5 +- Lib/pathlib/_abc.py | 7 +- Lib/pathlib/_glob.py | 305 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 314 insertions(+), 309 deletions(-) create mode 100644 Lib/pathlib/_glob.py diff --git a/Lib/glob.py b/Lib/glob.py index 72cf222..9fe0e9f 100644 --- a/Lib/glob.py +++ b/Lib/glob.py @@ -2,15 +2,14 @@ import contextlib import os -import re import fnmatch -import functools import itertools -import operator import stat import sys -__all__ = ["glob", "iglob", "escape"] +from pathlib._glob import translate, magic_check, magic_check_bytes + +__all__ = ["glob", "iglob", "escape", "translate"] def glob(pathname, *, root_dir=None, dir_fd=None, recursive=False, include_hidden=False): @@ -226,9 +225,6 @@ def _join(dirname, basename): return dirname or basename return os.path.join(dirname, basename) -magic_check = re.compile('([*?[])') -magic_check_bytes = re.compile(b'([*?[])') - def has_magic(s): if isinstance(s, bytes): match = magic_check_bytes.search(s) @@ -258,300 +254,4 @@ def escape(pathname): return drive + pathname -_special_parts = ('', '.', '..') _dir_open_flags = os.O_RDONLY | getattr(os, 'O_DIRECTORY', 0) -_no_recurse_symlinks = object() - - -def translate(pat, *, recursive=False, include_hidden=False, seps=None): - """Translate a pathname with shell wildcards to a regular expression. - - If `recursive` is true, the pattern segment '**' will match any number of - path segments. - - If `include_hidden` is true, wildcards can match path segments beginning - with a dot ('.'). - - If a sequence of separator characters is given to `seps`, they will be - used to split the pattern into segments and match path separators. If not - given, os.path.sep and os.path.altsep (where available) are used. - """ - if not seps: - if os.path.altsep: - seps = (os.path.sep, os.path.altsep) - else: - seps = os.path.sep - escaped_seps = ''.join(map(re.escape, seps)) - any_sep = f'[{escaped_seps}]' if len(seps) > 1 else escaped_seps - not_sep = f'[^{escaped_seps}]' - if include_hidden: - one_last_segment = f'{not_sep}+' - one_segment = f'{one_last_segment}{any_sep}' - any_segments = f'(?:.+{any_sep})?' - any_last_segments = '.*' - else: - one_last_segment = f'[^{escaped_seps}.]{not_sep}*' - one_segment = f'{one_last_segment}{any_sep}' - any_segments = f'(?:{one_segment})*' - any_last_segments = f'{any_segments}(?:{one_last_segment})?' - - results = [] - parts = re.split(any_sep, pat) - last_part_idx = len(parts) - 1 - for idx, part in enumerate(parts): - if part == '*': - results.append(one_segment if idx < last_part_idx else one_last_segment) - elif recursive and part == '**': - if idx < last_part_idx: - if parts[idx + 1] != '**': - results.append(any_segments) - else: - results.append(any_last_segments) - else: - if part: - if not include_hidden and part[0] in '*?': - results.append(r'(?!\.)') - results.extend(fnmatch._translate(part, f'{not_sep}*', not_sep)) - if idx < last_part_idx: - results.append(any_sep) - res = ''.join(results) - return fr'(?s:{res})\Z' - - -@functools.lru_cache(maxsize=512) -def _compile_pattern(pat, sep, case_sensitive, recursive=True): - """Compile given glob pattern to a re.Pattern object (observing case - sensitivity).""" - flags = re.NOFLAG if case_sensitive else re.IGNORECASE - regex = translate(pat, recursive=recursive, include_hidden=True, seps=sep) - return re.compile(regex, flags=flags).match - - -class _Globber: - """Class providing shell-style pattern matching and globbing. - """ - - def __init__(self, sep, case_sensitive, case_pedantic=False, recursive=False): - self.sep = sep - self.case_sensitive = case_sensitive - self.case_pedantic = case_pedantic - self.recursive = recursive - - # Low-level methods - - lstat = staticmethod(os.lstat) - scandir = staticmethod(os.scandir) - parse_entry = operator.attrgetter('path') - concat_path = operator.add - - if os.name == 'nt': - @staticmethod - def add_slash(pathname): - tail = os.path.splitroot(pathname)[2] - if not tail or tail[-1] in '\\/': - return pathname - return f'{pathname}\\' - else: - @staticmethod - def add_slash(pathname): - if not pathname or pathname[-1] == '/': - return pathname - return f'{pathname}/' - - # High-level methods - - def compile(self, pat): - return _compile_pattern(pat, self.sep, self.case_sensitive, self.recursive) - - def selector(self, parts): - """Returns a function that selects from a given path, walking and - filtering according to the glob-style pattern parts in *parts*. - """ - if not parts: - return self.select_exists - part = parts.pop() - if self.recursive and part == '**': - selector = self.recursive_selector - elif part in _special_parts: - selector = self.special_selector - elif not self.case_pedantic and magic_check.search(part) is None: - selector = self.literal_selector - else: - selector = self.wildcard_selector - return selector(part, parts) - - def special_selector(self, part, parts): - """Returns a function that selects special children of the given path. - """ - select_next = self.selector(parts) - - def select_special(path, exists=False): - path = self.concat_path(self.add_slash(path), part) - return select_next(path, exists) - return select_special - - def literal_selector(self, part, parts): - """Returns a function that selects a literal descendant of a path. - """ - - # Optimization: consume and join any subsequent literal parts here, - # rather than leaving them for the next selector. This reduces the - # number of string concatenation operations and calls to add_slash(). - while parts and magic_check.search(parts[-1]) is None: - part += self.sep + parts.pop() - - select_next = self.selector(parts) - - def select_literal(path, exists=False): - path = self.concat_path(self.add_slash(path), part) - return select_next(path, exists=False) - return select_literal - - def wildcard_selector(self, part, parts): - """Returns a function that selects direct children of a given path, - filtering by pattern. - """ - - match = None if part == '*' else self.compile(part) - dir_only = bool(parts) - if dir_only: - select_next = self.selector(parts) - - def select_wildcard(path, exists=False): - try: - # We must close the scandir() object before proceeding to - # avoid exhausting file descriptors when globbing deep trees. - with self.scandir(path) as scandir_it: - entries = list(scandir_it) - except OSError: - pass - else: - for entry in entries: - if match is None or match(entry.name): - if dir_only: - try: - if not entry.is_dir(): - continue - except OSError: - continue - entry_path = self.parse_entry(entry) - if dir_only: - yield from select_next(entry_path, exists=True) - else: - yield entry_path - return select_wildcard - - def recursive_selector(self, part, parts): - """Returns a function that selects a given path and all its children, - recursively, filtering by pattern. - """ - # Optimization: consume following '**' parts, which have no effect. - while parts and parts[-1] == '**': - parts.pop() - - # Optimization: consume and join any following non-special parts here, - # rather than leaving them for the next selector. They're used to - # build a regular expression, which we use to filter the results of - # the recursive walk. As a result, non-special pattern segments - # following a '**' wildcard don't require additional filesystem access - # to expand. - follow_symlinks = self.recursive is not _no_recurse_symlinks - if follow_symlinks: - while parts and parts[-1] not in _special_parts: - part += self.sep + parts.pop() - - match = None if part == '**' else self.compile(part) - dir_only = bool(parts) - select_next = self.selector(parts) - - def select_recursive(path, exists=False): - path = self.add_slash(path) - match_pos = len(str(path)) - if match is None or match(str(path), match_pos): - yield from select_next(path, exists) - stack = [path] - while stack: - yield from select_recursive_step(stack, match_pos) - - def select_recursive_step(stack, match_pos): - path = stack.pop() - try: - # We must close the scandir() object before proceeding to - # avoid exhausting file descriptors when globbing deep trees. - with self.scandir(path) as scandir_it: - entries = list(scandir_it) - except OSError: - pass - else: - for entry in entries: - is_dir = False - try: - if entry.is_dir(follow_symlinks=follow_symlinks): - is_dir = True - except OSError: - pass - - if is_dir or not dir_only: - entry_path = self.parse_entry(entry) - if match is None or match(str(entry_path), match_pos): - if dir_only: - yield from select_next(entry_path, exists=True) - else: - # Optimization: directly yield the path if this is - # last pattern part. - yield entry_path - if is_dir: - stack.append(entry_path) - - return select_recursive - - def select_exists(self, path, exists=False): - """Yields the given path, if it exists. - """ - if exists: - # Optimization: this path is already known to exist, e.g. because - # it was returned from os.scandir(), so we skip calling lstat(). - yield path - else: - try: - self.lstat(path) - yield path - except OSError: - pass - - @classmethod - def walk(cls, root, top_down, on_error, follow_symlinks): - """Walk the directory tree from the given root, similar to os.walk(). - """ - paths = [root] - while paths: - path = paths.pop() - if isinstance(path, tuple): - yield path - continue - try: - with cls.scandir(path) as scandir_it: - dirnames = [] - filenames = [] - if not top_down: - paths.append((path, dirnames, filenames)) - for entry in scandir_it: - name = entry.name - try: - if entry.is_dir(follow_symlinks=follow_symlinks): - if not top_down: - paths.append(cls.parse_entry(entry)) - dirnames.append(name) - else: - filenames.append(name) - except OSError: - filenames.append(name) - except OSError as error: - if on_error is not None: - on_error(error) - else: - if top_down: - yield path, dirnames, filenames - if dirnames: - prefix = cls.add_slash(path) - paths += [cls.concat_path(prefix, d) for d in reversed(dirnames)] diff --git a/Lib/pathlib/__init__.py b/Lib/pathlib/__init__.py index 8eecf2c..381b08c 100644 --- a/Lib/pathlib/__init__.py +++ b/Lib/pathlib/__init__.py @@ -5,7 +5,6 @@ paths with operations that have semantics appropriate for different operating systems. """ -import glob import io import ntpath import operator @@ -25,7 +24,7 @@ try: except ImportError: grp = None -from . import _abc +from . import _abc, _glob __all__ = [ @@ -113,7 +112,7 @@ class PurePath(_abc.PurePathBase): '_hash', ) parser = os.path - _globber = glob._Globber + _globber = _glob.Globber def __new__(cls, *args, **kwargs): """Construct a PurePath from one or several strings and or existing diff --git a/Lib/pathlib/_abc.py b/Lib/pathlib/_abc.py index c7e8e2f..591df443 100644 --- a/Lib/pathlib/_abc.py +++ b/Lib/pathlib/_abc.py @@ -12,11 +12,12 @@ resemble pathlib's PurePath and Path respectively. """ import functools -import glob import operator from errno import ENOENT, ENOTDIR, EBADF, ELOOP, EINVAL from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO +from . import _glob + # # Internals # @@ -43,7 +44,7 @@ def _is_case_sensitive(parser): return parser.normcase('Aa') == 'Aa' -class Globber(glob._Globber): +class Globber(_glob.Globber): lstat = operator.methodcaller('lstat') add_slash = operator.methodcaller('joinpath', '') @@ -692,7 +693,7 @@ class PathBase(PurePathBase): # know the case sensitivity of the underlying filesystem, so we # must use scandir() for everything, including non-wildcard parts. case_pedantic = True - recursive = True if recurse_symlinks else glob._no_recurse_symlinks + recursive = True if recurse_symlinks else _glob.no_recurse_symlinks globber = self._globber(self.parser.sep, case_sensitive, case_pedantic, recursive) return globber.selector(parts) diff --git a/Lib/pathlib/_glob.py b/Lib/pathlib/_glob.py new file mode 100644 index 0000000..73ccc06 --- /dev/null +++ b/Lib/pathlib/_glob.py @@ -0,0 +1,305 @@ +import os +import re +import fnmatch +import functools +import operator + + +special_parts = ('', '.', '..') +magic_check = re.compile('([*?[])') +magic_check_bytes = re.compile(b'([*?[])') +no_recurse_symlinks = object() + + +def translate(pat, *, recursive=False, include_hidden=False, seps=None): + """Translate a pathname with shell wildcards to a regular expression. + + If `recursive` is true, the pattern segment '**' will match any number of + path segments. + + If `include_hidden` is true, wildcards can match path segments beginning + with a dot ('.'). + + If a sequence of separator characters is given to `seps`, they will be + used to split the pattern into segments and match path separators. If not + given, os.path.sep and os.path.altsep (where available) are used. + """ + if not seps: + if os.path.altsep: + seps = (os.path.sep, os.path.altsep) + else: + seps = os.path.sep + escaped_seps = ''.join(map(re.escape, seps)) + any_sep = f'[{escaped_seps}]' if len(seps) > 1 else escaped_seps + not_sep = f'[^{escaped_seps}]' + if include_hidden: + one_last_segment = f'{not_sep}+' + one_segment = f'{one_last_segment}{any_sep}' + any_segments = f'(?:.+{any_sep})?' + any_last_segments = '.*' + else: + one_last_segment = f'[^{escaped_seps}.]{not_sep}*' + one_segment = f'{one_last_segment}{any_sep}' + any_segments = f'(?:{one_segment})*' + any_last_segments = f'{any_segments}(?:{one_last_segment})?' + + results = [] + parts = re.split(any_sep, pat) + last_part_idx = len(parts) - 1 + for idx, part in enumerate(parts): + if part == '*': + results.append(one_segment if idx < last_part_idx else one_last_segment) + elif recursive and part == '**': + if idx < last_part_idx: + if parts[idx + 1] != '**': + results.append(any_segments) + else: + results.append(any_last_segments) + else: + if part: + if not include_hidden and part[0] in '*?': + results.append(r'(?!\.)') + results.extend(fnmatch._translate(part, f'{not_sep}*', not_sep)) + if idx < last_part_idx: + results.append(any_sep) + res = ''.join(results) + return fr'(?s:{res})\Z' + + +@functools.lru_cache(maxsize=512) +def compile_pattern(pat, sep, case_sensitive, recursive=True): + """Compile given glob pattern to a re.Pattern object (observing case + sensitivity).""" + flags = re.NOFLAG if case_sensitive else re.IGNORECASE + regex = translate(pat, recursive=recursive, include_hidden=True, seps=sep) + return re.compile(regex, flags=flags).match + + +class Globber: + """Class providing shell-style pattern matching and globbing. + """ + + def __init__(self, sep, case_sensitive, case_pedantic=False, recursive=False): + self.sep = sep + self.case_sensitive = case_sensitive + self.case_pedantic = case_pedantic + self.recursive = recursive + + # Low-level methods + + lstat = staticmethod(os.lstat) + scandir = staticmethod(os.scandir) + parse_entry = operator.attrgetter('path') + concat_path = operator.add + + if os.name == 'nt': + @staticmethod + def add_slash(pathname): + tail = os.path.splitroot(pathname)[2] + if not tail or tail[-1] in '\\/': + return pathname + return f'{pathname}\\' + else: + @staticmethod + def add_slash(pathname): + if not pathname or pathname[-1] == '/': + return pathname + return f'{pathname}/' + + # High-level methods + + def compile(self, pat): + return compile_pattern(pat, self.sep, self.case_sensitive, self.recursive) + + def selector(self, parts): + """Returns a function that selects from a given path, walking and + filtering according to the glob-style pattern parts in *parts*. + """ + if not parts: + return self.select_exists + part = parts.pop() + if self.recursive and part == '**': + selector = self.recursive_selector + elif part in special_parts: + selector = self.special_selector + elif not self.case_pedantic and magic_check.search(part) is None: + selector = self.literal_selector + else: + selector = self.wildcard_selector + return selector(part, parts) + + def special_selector(self, part, parts): + """Returns a function that selects special children of the given path. + """ + select_next = self.selector(parts) + + def select_special(path, exists=False): + path = self.concat_path(self.add_slash(path), part) + return select_next(path, exists) + return select_special + + def literal_selector(self, part, parts): + """Returns a function that selects a literal descendant of a path. + """ + + # Optimization: consume and join any subsequent literal parts here, + # rather than leaving them for the next selector. This reduces the + # number of string concatenation operations and calls to add_slash(). + while parts and magic_check.search(parts[-1]) is None: + part += self.sep + parts.pop() + + select_next = self.selector(parts) + + def select_literal(path, exists=False): + path = self.concat_path(self.add_slash(path), part) + return select_next(path, exists=False) + return select_literal + + def wildcard_selector(self, part, parts): + """Returns a function that selects direct children of a given path, + filtering by pattern. + """ + + match = None if part == '*' else self.compile(part) + dir_only = bool(parts) + if dir_only: + select_next = self.selector(parts) + + def select_wildcard(path, exists=False): + try: + # We must close the scandir() object before proceeding to + # avoid exhausting file descriptors when globbing deep trees. + with self.scandir(path) as scandir_it: + entries = list(scandir_it) + except OSError: + pass + else: + for entry in entries: + if match is None or match(entry.name): + if dir_only: + try: + if not entry.is_dir(): + continue + except OSError: + continue + entry_path = self.parse_entry(entry) + if dir_only: + yield from select_next(entry_path, exists=True) + else: + yield entry_path + return select_wildcard + + def recursive_selector(self, part, parts): + """Returns a function that selects a given path and all its children, + recursively, filtering by pattern. + """ + # Optimization: consume following '**' parts, which have no effect. + while parts and parts[-1] == '**': + parts.pop() + + # Optimization: consume and join any following non-special parts here, + # rather than leaving them for the next selector. They're used to + # build a regular expression, which we use to filter the results of + # the recursive walk. As a result, non-special pattern segments + # following a '**' wildcard don't require additional filesystem access + # to expand. + follow_symlinks = self.recursive is not no_recurse_symlinks + if follow_symlinks: + while parts and parts[-1] not in special_parts: + part += self.sep + parts.pop() + + match = None if part == '**' else self.compile(part) + dir_only = bool(parts) + select_next = self.selector(parts) + + def select_recursive(path, exists=False): + path = self.add_slash(path) + match_pos = len(str(path)) + if match is None or match(str(path), match_pos): + yield from select_next(path, exists) + stack = [path] + while stack: + yield from select_recursive_step(stack, match_pos) + + def select_recursive_step(stack, match_pos): + path = stack.pop() + try: + # We must close the scandir() object before proceeding to + # avoid exhausting file descriptors when globbing deep trees. + with self.scandir(path) as scandir_it: + entries = list(scandir_it) + except OSError: + pass + else: + for entry in entries: + is_dir = False + try: + if entry.is_dir(follow_symlinks=follow_symlinks): + is_dir = True + except OSError: + pass + + if is_dir or not dir_only: + entry_path = self.parse_entry(entry) + if match is None or match(str(entry_path), match_pos): + if dir_only: + yield from select_next(entry_path, exists=True) + else: + # Optimization: directly yield the path if this is + # last pattern part. + yield entry_path + if is_dir: + stack.append(entry_path) + + return select_recursive + + def select_exists(self, path, exists=False): + """Yields the given path, if it exists. + """ + if exists: + # Optimization: this path is already known to exist, e.g. because + # it was returned from os.scandir(), so we skip calling lstat(). + yield path + else: + try: + self.lstat(path) + yield path + except OSError: + pass + + @classmethod + def walk(cls, root, top_down, on_error, follow_symlinks): + """Walk the directory tree from the given root, similar to os.walk(). + """ + paths = [root] + while paths: + path = paths.pop() + if isinstance(path, tuple): + yield path + continue + try: + with cls.scandir(path) as scandir_it: + dirnames = [] + filenames = [] + if not top_down: + paths.append((path, dirnames, filenames)) + for entry in scandir_it: + name = entry.name + try: + if entry.is_dir(follow_symlinks=follow_symlinks): + if not top_down: + paths.append(cls.parse_entry(entry)) + dirnames.append(name) + else: + filenames.append(name) + except OSError: + filenames.append(name) + except OSError as error: + if on_error is not None: + on_error(error) + else: + if top_down: + yield path, dirnames, filenames + if dirnames: + prefix = cls.add_slash(path) + paths += [cls.concat_path(prefix, d) for d in reversed(dirnames)] -- cgit v0.12