summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_atexit.py
blob: eb01da6e88a8bcd3d2641fe1257333a24ae3d1c9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import atexit
import os
import textwrap
import unittest
from test import support
from test.support import script_helper
from test.support import threading_helper

class GeneralTest(unittest.TestCase):
    def test_general(self):
        # Run _test_atexit.py in a subprocess since it calls atexit._clear()
        script = support.findfile("_test_atexit.py")
        script_helper.run_test_script(script)

class FunctionalTest(unittest.TestCase):
    def test_shutdown(self):
        # Actually test the shutdown mechanism in a subprocess
        code = textwrap.dedent("""
            import atexit

            def f(msg):
                print(msg)

            atexit.register(f, "one")
            atexit.register(f, "two")
        """)
        res = script_helper.assert_python_ok("-c", code)
        self.assertEqual(res.out.decode().splitlines(), ["two", "one"])
        self.assertFalse(res.err)

    def test_atexit_instances(self):
        # bpo-42639: It is safe to have more than one atexit instance.
        code = textwrap.dedent("""
            import sys
            import atexit as atexit1
            del sys.modules['atexit']
            import atexit as atexit2
            del sys.modules['atexit']

            assert atexit2 is not atexit1

            atexit1.register(print, "atexit1")
            atexit2.register(print, "atexit2")
        """)
        res = script_helper.assert_python_ok("-c", code)
        self.assertEqual(res.out.decode().splitlines(), ["atexit2", "atexit1"])
        self.assertFalse(res.err)

    @threading_helper.requires_working_threading()
    @support.requires_resource("cpu")
    @unittest.skipUnless(support.Py_GIL_DISABLED, "only meaningful without the GIL")
    def test_atexit_thread_safety(self):
        # GH-126907: atexit was not thread safe on the free-threaded build
        source = """
        from threading import Thread

        def dummy():
            pass


        def thready():
            for _ in range(100):
                atexit.register(dummy)
                atexit._clear()
                atexit.register(dummy)
                atexit.unregister(dummy)
                atexit._run_exitfuncs()


        threads = [Thread(target=thready) for _ in range(10)]
        for thread in threads:
            thread.start()

        for thread in threads:
            thread.join()
        """

        # atexit._clear() has some evil side effects, and we don't
        # want them to affect the rest of the tests.
        script_helper.assert_python_ok("-c", textwrap.dedent(source))


@support.cpython_only
class SubinterpreterTest(unittest.TestCase):

    def test_callbacks_leak(self):
        # This test shows a leak in refleak mode if atexit doesn't
        # take care to free callbacks in its per-subinterpreter module
        # state.
        n = atexit._ncallbacks()
        code = textwrap.dedent(r"""
            import atexit
            def f():
                pass
            atexit.register(f)
            del atexit
        """)
        ret = support.run_in_subinterp(code)
        self.assertEqual(ret, 0)
        self.assertEqual(atexit._ncallbacks(), n)

    def test_callbacks_leak_refcycle(self):
        # Similar to the above, but with a refcycle through the atexit
        # module.
        n = atexit._ncallbacks()
        code = textwrap.dedent(r"""
            import atexit
            def f():
                pass
            atexit.register(f)
            atexit.__atexit = atexit
        """)
        ret = support.run_in_subinterp(code)
        self.assertEqual(ret, 0)
        self.assertEqual(atexit._ncallbacks(), n)

    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
    def test_callback_on_subinterpreter_teardown(self):
        # This tests if a callback is called on
        # subinterpreter teardown.
        expected = b"The test has passed!"
        r, w = os.pipe()

        code = textwrap.dedent(r"""
            import os
            import atexit
            def callback():
                os.write({:d}, b"The test has passed!")
            atexit.register(callback)
        """.format(w))
        ret = support.run_in_subinterp(code)
        os.close(w)
        self.assertEqual(os.read(r, len(expected)), expected)
        os.close(r)


if __name__ == "__main__":
    unittest.main()