summaryrefslogtreecommitdiffstats
path: root/Lib/fnmatch.py
blob: 6330b0cfdab5daf1eda2882c3b3eeb8afd6dff70 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""Filename matching with shell patterns.

fnmatch(FILENAME, PATTERN) matches according to the local convention.
fnmatchcase(FILENAME, PATTERN) always takes case in account.

The functions operate by translating the pattern into a regular
expression.  They cache the compiled regular expressions for speed.

The function translate(PATTERN) returns a regular expression
corresponding to PATTERN.  (It does not compile it.)
"""
import os
import posixpath
import re
import functools

__all__ = ["filter", "fnmatch", "fnmatchcase", "translate"]

def fnmatch(name, pat):
    """Test whether FILENAME matches PATTERN.

    Patterns are Unix shell style:

    *       matches everything
    ?       matches any single character
    [seq]   matches any character in seq
    [!seq]  matches any char not in seq

    An initial period in FILENAME is not special.
    Both FILENAME and PATTERN are first case-normalized
    if the operating system requires it.
    If you don't want this, use fnmatchcase(FILENAME, PATTERN).
    """
    name = os.path.normcase(name)
    pat = os.path.normcase(pat)
    return fnmatchcase(name, pat)

@functools.lru_cache(maxsize=256, typed=True)
def _compile_pattern(pat):
    if isinstance(pat, bytes):
        pat_str = str(pat, 'ISO-8859-1')
        res_str = translate(pat_str)
        res = bytes(res_str, 'ISO-8859-1')
    else:
        res = translate(pat)
    return re.compile(res).match

def filter(names, pat):
    """Return the subset of the list NAMES that match PAT."""
    result = []
    pat = os.path.normcase(pat)
    match = _compile_pattern(pat)
    if os.path is posixpath:
        # normcase on posix is NOP. Optimize it away from the loop.
        for name in names:
            if match(name):
                result.append(name)
    else:
        for name in names:
            if match(os.path.normcase(name)):
                result.append(name)
    return result

def fnmatchcase(name, pat):
    """Test whether FILENAME matches PATTERN, including case.

    This is a version of fnmatch() which doesn't case-normalize
    its arguments.
    """
    match = _compile_pattern(pat)
    return match(name) is not None


def translate(pat):
    """Translate a shell PATTERN to a regular expression.

    There is no way to quote meta-characters.
    """

    i, n = 0, len(pat)
    res = ''
    while i < n:
        c = pat[i]
        i = i+1
        if c == '*':
            res = res + '.*'
        elif c == '?':
            res = res + '.'
        elif c == '[':
            j = i
            if j < n and pat[j] == '!':
                j = j+1
            if j < n and pat[j] == ']':
                j = j+1
            while j < n and pat[j] != ']':
                j = j+1
            if j >= n:
                res = res + '\\['
            else:
                stuff = pat[i:j].replace('\\','\\\\')
                i = j+1
                if stuff[0] == '!':
                    stuff = '^' + stuff[1:]
                elif stuff[0] == '^':
                    stuff = '\\' + stuff
                res = '%s[%s]' % (res, stuff)
        else:
            res = res + re.escape(c)
    return res + '\Z(?ms)'
f='#n473'>473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
"""
Tests for object finalization semantics, as outlined in PEP 442.
"""

import contextlib
import gc
import unittest
import weakref

try:
    from _testcapi import with_tp_del
except ImportError:
    def with_tp_del(cls):
        class C(object):
            def __new__(cls, *args, **kwargs):
                raise TypeError('requires _testcapi.with_tp_del')
        return C

from test import support


class NonGCSimpleBase:
    """
    The base class for all the objects under test, equipped with various
    testing features.
    """

    survivors = []
    del_calls = []
    tp_del_calls = []
    errors = []

    _cleaning = False

    __slots__ = ()

    @classmethod
    def _cleanup(cls):
        cls.survivors.clear()
        cls.errors.clear()
        gc.garbage.clear()
        gc.collect()
        cls.del_calls.clear()
        cls.tp_del_calls.clear()

    @classmethod
    @contextlib.contextmanager
    def test(cls):
        """
        A context manager to use around all finalization tests.
        """
        with support.disable_gc():
            cls.del_calls.clear()
            cls.tp_del_calls.clear()
            NonGCSimpleBase._cleaning = False
            try:
                yield
                if cls.errors:
                    raise cls.errors[0]
            finally:
                NonGCSimpleBase._cleaning = True
                cls._cleanup()

    def check_sanity(self):
        """
        Check the object is sane (non-broken).
        """

    def __del__(self):
        """
        PEP 442 finalizer.  Record that this was called, check the
        object is in a sane state, and invoke a side effect.
        """
        try:
            if not self._cleaning:
                self.del_calls.append(id(self))
                self.check_sanity()
                self.side_effect()
        except Exception as e:
            self.errors.append(e)

    def side_effect(self):
        """
        A side effect called on destruction.
        """


class SimpleBase(NonGCSimpleBase):

    def __init__(self):
        self.id_ = id(self)

    def check_sanity(self):
        assert self.id_ == id(self)


class NonGC(NonGCSimpleBase):
    __slots__ = ()

class NonGCResurrector(NonGCSimpleBase):
    __slots__ = ()

    def side_effect(self):
        """
        Resurrect self by storing self in a class-wide list.
        """
        self.survivors.append(self)

class Simple(SimpleBase):
    pass

class SimpleResurrector(NonGCResurrector, SimpleBase):
    pass


class TestBase:

    def setUp(self):
        self.old_garbage = gc.garbage[:]
        gc.garbage[:] = []

    def tearDown(self):
        # None of the tests here should put anything in gc.garbage
        try:
            self.assertEqual(gc.garbage, [])
        finally:
            del self.old_garbage
            gc.collect()

    def assert_del_calls(self, ids):
        self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))

    def assert_tp_del_calls(self, ids):
        self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))

    def assert_survivors(self, ids):
        self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))

    def assert_garbage(self, ids):
        self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))

    def clear_survivors(self):
        SimpleBase.survivors.clear()


class SimpleFinalizationTest(TestBase, unittest.TestCase):
    """
    Test finalization without refcycles.
    """

    def test_simple(self):
        with SimpleBase.test():
            s = Simple()
            ids = [id(s)]
            wr = weakref.ref(s)
            del s
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])
            self.assertIs(wr(), None)
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])

    def test_simple_resurrect(self):
        with SimpleBase.test():
            s = SimpleResurrector()
            ids = [id(s)]
            wr = weakref.ref(s)
            del s
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors(ids)
            self.assertIsNot(wr(), None)
            self.clear_survivors()
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])
        self.assertIs(wr(), None)

    def test_non_gc(self):
        with SimpleBase.test():
            s = NonGC()
            self.assertFalse(gc.is_tracked(s))
            ids = [id(s)]
            del s
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])

    def test_non_gc_resurrect(self):
        with SimpleBase.test():
            s = NonGCResurrector()
            self.assertFalse(gc.is_tracked(s))
            ids = [id(s)]
            del s
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors(ids)
            self.clear_survivors()
            gc.collect()
            self.assert_del_calls(ids * 2)
            self.assert_survivors(ids)


class SelfCycleBase:

    def __init__(self):
        super().__init__()
        self.ref = self

    def check_sanity(self):
        super().check_sanity()
        assert self.ref is self

class SimpleSelfCycle(SelfCycleBase, Simple):
    pass

class SelfCycleResurrector(SelfCycleBase, SimpleResurrector):
    pass

class SuicidalSelfCycle(SelfCycleBase, Simple):

    def side_effect(self):
        """
        Explicitly break the reference cycle.
        """
        self.ref = None


class SelfCycleFinalizationTest(TestBase, unittest.TestCase):
    """
    Test finalization of an object having a single cyclic reference to
    itself.
    """

    def test_simple(self):
        with SimpleBase.test():
            s = SimpleSelfCycle()
            ids = [id(s)]
            wr = weakref.ref(s)
            del s
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])
            self.assertIs(wr(), None)
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])

    def test_simple_resurrect(self):
        # Test that __del__ can resurrect the object being finalized.
        with SimpleBase.test():
            s = SelfCycleResurrector()
            ids = [id(s)]
            wr = weakref.ref(s)
            del s
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors(ids)
            # XXX is this desirable?
            self.assertIs(wr(), None)
            # When trying to destroy the object a second time, __del__
            # isn't called anymore (and the object isn't resurrected).
            self.clear_survivors()
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])
            self.assertIs(wr(), None)

    def test_simple_suicide(self):
        # Test the GC is able to deal with an object that kills its last
        # reference during __del__.
        with SimpleBase.test():
            s = SuicidalSelfCycle()
            ids = [id(s)]
            wr = weakref.ref(s)
            del s
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])
            self.assertIs(wr(), None)
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])
            self.assertIs(wr(), None)


class ChainedBase:

    def chain(self, left):
        self.suicided = False
        self.left = left
        left.right = self

    def check_sanity(self):
        super().check_sanity()
        if self.suicided:
            assert self.left is None
            assert self.right is None
        else:
            left = self.left
            if left.suicided:
                assert left.right is None
            else:
                assert left.right is self
            right = self.right
            if right.suicided:
                assert right.left is None
            else:
                assert right.left is self

class SimpleChained(ChainedBase, Simple):
    pass

class ChainedResurrector(ChainedBase, SimpleResurrector):
    pass

class SuicidalChained(ChainedBase, Simple):

    def side_effect(self):
        """
        Explicitly break the reference cycle.
        """
        self.suicided = True
        self.left = None
        self.right = None


class CycleChainFinalizationTest(TestBase, unittest.TestCase):
    """
    Test finalization of a cyclic chain.  These tests are similar in
    spirit to the self-cycle tests above, but the collectable object
    graph isn't trivial anymore.
    """

    def build_chain(self, classes):
        nodes = [cls() for cls in classes]
        for i in range(len(nodes)):
            nodes[i].chain(nodes[i-1])
        return nodes

    def check_non_resurrecting_chain(self, classes):
        N = len(classes)
        with SimpleBase.test():
            nodes = self.build_chain(classes)
            ids = [id(s) for s in nodes]
            wrs = [weakref.ref(s) for s in nodes]
            del nodes
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])
            self.assertEqual([wr() for wr in wrs], [None] * N)
            gc.collect()
            self.assert_del_calls(ids)

    def check_resurrecting_chain(self, classes):
        N = len(classes)
        with SimpleBase.test():
            nodes = self.build_chain(classes)
            N = len(nodes)
            ids = [id(s) for s in nodes]
            survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
            wrs = [weakref.ref(s) for s in nodes]
            del nodes
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors(survivor_ids)
            # XXX desirable?
            self.assertEqual([wr() for wr in wrs], [None] * N)
            self.clear_survivors()
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_survivors([])

    def test_homogenous(self):
        self.check_non_resurrecting_chain([SimpleChained] * 3)

    def test_homogenous_resurrect(self):
        self.check_resurrecting_chain([ChainedResurrector] * 3)

    def test_homogenous_suicidal(self):
        self.check_non_resurrecting_chain([SuicidalChained] * 3)

    def test_heterogenous_suicidal_one(self):
        self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)

    def test_heterogenous_suicidal_two(self):
        self.check_non_resurrecting_chain(
            [SuicidalChained] * 2 + [SimpleChained] * 2)

    def test_heterogenous_resurrect_one(self):
        self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)

    def test_heterogenous_resurrect_two(self):
        self.check_resurrecting_chain(
            [ChainedResurrector, SimpleChained, SuicidalChained] * 2)

    def test_heterogenous_resurrect_three(self):
        self.check_resurrecting_chain(
            [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)


# NOTE: the tp_del slot isn't automatically inherited, so we have to call
# with_tp_del() for each instantiated class.

class LegacyBase(SimpleBase):

    def __del__(self):
        try:
            # Do not invoke side_effect here, since we are now exercising
            # the tp_del slot.
            if not self._cleaning:
                self.del_calls.append(id(self))
                self.check_sanity()
        except Exception as e:
            self.errors.append(e)

    def __tp_del__(self):
        """
        Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
        """
        try:
            if not self._cleaning:
                self.tp_del_calls.append(id(self))
                self.check_sanity()
                self.side_effect()
        except Exception as e:
            self.errors.append(e)

@with_tp_del
class Legacy(LegacyBase):
    pass

@with_tp_del
class LegacyResurrector(LegacyBase):

    def side_effect(self):
        """
        Resurrect self by storing self in a class-wide list.
        """
        self.survivors.append(self)

@with_tp_del
class LegacySelfCycle(SelfCycleBase, LegacyBase):
    pass


@support.cpython_only
class LegacyFinalizationTest(TestBase, unittest.TestCase):
    """
    Test finalization of objects with a tp_del.
    """

    def tearDown(self):
        # These tests need to clean up a bit more, since they create
        # uncollectable objects.
        gc.garbage.clear()
        gc.collect()
        super().tearDown()

    def test_legacy(self):
        with SimpleBase.test():
            s = Legacy()
            ids = [id(s)]
            wr = weakref.ref(s)
            del s
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_tp_del_calls(ids)
            self.assert_survivors([])
            self.assertIs(wr(), None)
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_tp_del_calls(ids)

    def test_legacy_resurrect(self):
        with SimpleBase.test():
            s = LegacyResurrector()
            ids = [id(s)]
            wr = weakref.ref(s)
            del s
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_tp_del_calls(ids)
            self.assert_survivors(ids)
            # weakrefs are cleared before tp_del is called.
            self.assertIs(wr(), None)
            self.clear_survivors()
            gc.collect()
            self.assert_del_calls(ids)
            self.assert_tp_del_calls(ids * 2)
            self.assert_survivors(ids)
        self.assertIs(wr(), None)

    def test_legacy_self_cycle(self):
        # Self-cycles with legacy finalizers end up in gc.garbage.
        with SimpleBase.test():
            s = LegacySelfCycle()
            ids = [id(s)]
            wr = weakref.ref(s)
            del s
            gc.collect()
            self.assert_del_calls([])
            self.assert_tp_del_calls([])
            self.assert_survivors([])
            self.assert_garbage(ids)
            self.assertIsNot(wr(), None)
            # Break the cycle to allow collection
            gc.garbage[0].ref = None
        self.assert_garbage([])
        self.assertIs(wr(), None)


if __name__ == "__main__":
    unittest.main()