summaryrefslogtreecommitdiffstats
path: root/Lib/test/libregrtest/runtest.py
blob: 489ab986cb4e5e4d1ddc68263580eb1349c1a9b1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
import faulthandler
import functools
import gc
import importlib
import io
import os
import sys
import time
import traceback
import unittest

from test import support
from test.support import os_helper
from test.libregrtest.cmdline import Namespace
from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import clear_caches, format_duration, print_warning


class TestResult:
    def __init__(
        self,
        name: str,
        duration_sec: float = 0.0,
        xml_data: list[str] | None = None,
    ) -> None:
        self.name = name
        self.duration_sec = duration_sec
        self.xml_data = xml_data

    def __str__(self) -> str:
        return f"{self.name} finished"


class Passed(TestResult):
    def __str__(self) -> str:
        return f"{self.name} passed"


class Failed(TestResult):
    def __init__(
        self,
        name: str,
        duration_sec: float = 0.0,
        xml_data: list[str] | None = None,
        errors: list[tuple[str, str]] | None = None,
        failures: list[tuple[str, str]] | None = None,
    ) -> None:
        super().__init__(name, duration_sec=duration_sec, xml_data=xml_data)
        self.errors = errors
        self.failures = failures

    def __str__(self) -> str:
        if self.errors and self.failures:
            le = len(self.errors)
            lf = len(self.failures)
            error_s = "error" + ("s" if le > 1 else "")
            failure_s = "failure" + ("s" if lf > 1 else "")
            return f"{self.name} failed ({le} {error_s}, {lf} {failure_s})"

        if self.errors:
            le = len(self.errors)
            error_s = "error" + ("s" if le > 1 else "")
            return f"{self.name} failed ({le} {error_s})"

        if self.failures:
            lf = len(self.failures)
            failure_s = "failure" + ("s" if lf > 1 else "")
            return f"{self.name} failed ({lf} {failure_s})"

        return f"{self.name} failed"


class UncaughtException(Failed):
    def __str__(self) -> str:
        return f"{self.name} failed (uncaught exception)"


class EnvChanged(Failed):
    def __str__(self) -> str:
        return f"{self.name} failed (env changed)"


class RefLeak(Failed):
    def __str__(self) -> str:
        return f"{self.name} failed (reference leak)"


class Skipped(TestResult):
    def __str__(self) -> str:
        return f"{self.name} skipped"


class ResourceDenied(Skipped):
    def __str__(self) -> str:
        return f"{self.name} skipped (resource denied)"


class Interrupted(TestResult):
    def __str__(self) -> str:
        return f"{self.name} interrupted"


class ChildError(Failed):
    def __str__(self) -> str:
        return f"{self.name} crashed"


class DidNotRun(TestResult):
    def __str__(self) -> str:
        return f"{self.name} ran no tests"


class Timeout(Failed):
    def __str__(self) -> str:
        return f"{self.name} timed out ({format_duration(self.duration_sec)})"


# Minimum duration of a test to display its duration or to mention that
# the test is running in background
PROGRESS_MIN_TIME = 30.0   # seconds

# small set of tests to determine if we have a basically functioning interpreter
# (i.e. if any of these fail, then anything else is likely to follow)
STDTESTS = [
    'test_grammar',
    'test_opcodes',
    'test_dict',
    'test_builtin',
    'test_exceptions',
    'test_types',
    'test_unittest',
    'test_doctest',
    'test_doctest2',
    'test_support'
]

# set of tests that we don't want to be executed when using regrtest
NOTTESTS = set()


# used by --findleaks, store for gc.garbage
FOUND_GARBAGE = []


def is_failed(result: TestResult, ns: Namespace) -> bool:
    if isinstance(result, EnvChanged):
        return ns.fail_env_changed
    return isinstance(result, Failed)


def findtestdir(path=None):
    return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir


def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
    """Return a list of all applicable test modules."""
    testdir = findtestdir(testdir)
    names = os.listdir(testdir)
    tests = []
    others = set(stdtests) | nottests
    for name in names:
        mod, ext = os.path.splitext(name)
        if mod[:5] == "test_" and ext in (".py", "") and mod not in others:
            tests.append(mod)
    return stdtests + sorted(tests)


def get_abs_module(ns: Namespace, test_name: str) -> str:
    if test_name.startswith('test.') or ns.testdir:
        return test_name
    else:
        # Import it from the test package
        return 'test.' + test_name


def _runtest(ns: Namespace, test_name: str) -> TestResult:
    # Handle faulthandler timeout, capture stdout+stderr, XML serialization
    # and measure time.

    output_on_failure = ns.verbose3

    use_timeout = (ns.timeout is not None)
    if use_timeout:
        faulthandler.dump_traceback_later(ns.timeout, exit=True)

    start_time = time.perf_counter()
    try:
        support.set_match_tests(ns.match_tests, ns.ignore_tests)
        support.junit_xml_list = xml_list = [] if ns.xmlpath else None
        if ns.failfast:
            support.failfast = True

        if output_on_failure:
            support.verbose = True

            stream = io.StringIO()
            orig_stdout = sys.stdout
            orig_stderr = sys.stderr
            try:
                sys.stdout = stream
                sys.stderr = stream
                result = _runtest_inner(ns, test_name,
                                        display_failure=False)
                if not isinstance(result, Passed):
                    output = stream.getvalue()
                    orig_stderr.write(output)
                    orig_stderr.flush()
            finally:
                sys.stdout = orig_stdout
                sys.stderr = orig_stderr
        else:
            # Tell tests to be moderately quiet
            support.verbose = ns.verbose

            result = _runtest_inner(ns, test_name,
                                    display_failure=not ns.verbose)

        if xml_list:
            import xml.etree.ElementTree as ET
            result.xml_data = [
                ET.tostring(x).decode('us-ascii')
                for x in xml_list
            ]

        result.duration_sec = time.perf_counter() - start_time
        return result
    finally:
        if use_timeout:
            faulthandler.cancel_dump_traceback_later()
        support.junit_xml_list = None


def runtest(ns: Namespace, test_name: str) -> TestResult:
    """Run a single test.

    ns -- regrtest namespace of options
    test_name -- the name of the test

    Returns a TestResult sub-class depending on the kind of result received.

    If ns.xmlpath is not None, xml_data is a list containing each
    generated testsuite element.
    """
    try:
        return _runtest(ns, test_name)
    except:
        if not ns.pgo:
            msg = traceback.format_exc()
            print(f"test {test_name} crashed -- {msg}",
                  file=sys.stderr, flush=True)
        return Failed(test_name)


def _test_module(the_module):
    loader = unittest.TestLoader()
    tests = loader.loadTestsFromModule(the_module)
    for error in loader.errors:
        print(error, file=sys.stderr)
    if loader.errors:
        raise Exception("errors while loading tests")
    support.run_unittest(tests)


def save_env(ns: Namespace, test_name: str):
    return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)


def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
    # Load the test function, run the test function, handle huntrleaks
    # and findleaks to detect leaks

    abstest = get_abs_module(ns, test_name)

    # remove the module from sys.module to reload it if it was already imported
    try:
        del sys.modules[abstest]
    except KeyError:
        pass

    the_module = importlib.import_module(abstest)

    if ns.huntrleaks:
        from test.libregrtest.refleak import dash_R

    # If the test has a test_main, that will run the appropriate
    # tests.  If not, use normal unittest test loading.
    test_runner = getattr(the_module, "test_main", None)
    if test_runner is None:
        test_runner = functools.partial(_test_module, the_module)

    try:
        with save_env(ns, test_name):
            if ns.huntrleaks:
                # Return True if the test leaked references
                refleak = dash_R(ns, test_name, test_runner)
            else:
                test_runner()
                refleak = False
    finally:
        cleanup_test_droppings(test_name, ns.verbose)

    support.gc_collect()

    if gc.garbage:
        support.environment_altered = True
        print_warning(f"{test_name} created {len(gc.garbage)} "
                      f"uncollectable object(s).")

        # move the uncollectable objects somewhere,
        # so we don't see them again
        FOUND_GARBAGE.extend(gc.garbage)
        gc.garbage.clear()

    support.reap_children()

    return refleak


def _runtest_inner(
    ns: Namespace, test_name: str, display_failure: bool = True
) -> TestResult:
    # Detect environment changes, handle exceptions.

    # Reset the environment_altered flag to detect if a test altered
    # the environment
    support.environment_altered = False

    if ns.pgo:
        display_failure = False

    try:
        clear_caches()

        with save_env(ns, test_name):
            refleak = _runtest_inner2(ns, test_name)
    except support.ResourceDenied as msg:
        if not ns.quiet and not ns.pgo:
            print(f"{test_name} skipped -- {msg}", flush=True)
        return ResourceDenied(test_name)
    except unittest.SkipTest as msg:
        if not ns.quiet and not ns.pgo:
            print(f"{test_name} skipped -- {msg}", flush=True)
        return Skipped(test_name)
    except support.TestFailedWithDetails as exc:
        msg = f"test {test_name} failed"
        if display_failure:
            msg = f"{msg} -- {exc}"
        print(msg, file=sys.stderr, flush=True)
        return Failed(test_name, errors=exc.errors, failures=exc.failures)
    except support.TestFailed as exc:
        msg = f"test {test_name} failed"
        if display_failure:
            msg = f"{msg} -- {exc}"
        print(msg, file=sys.stderr, flush=True)
        return Failed(test_name)
    except support.TestDidNotRun:
        return DidNotRun(test_name)
    except KeyboardInterrupt:
        print()
        return Interrupted(test_name)
    except:
        if not ns.pgo:
            msg = traceback.format_exc()
            print(f"test {test_name} crashed -- {msg}",
                  file=sys.stderr, flush=True)
        return UncaughtException(test_name)

    if refleak:
        return RefLeak(test_name)
    if support.environment_altered:
        return EnvChanged(test_name)
    return Passed(test_name)


def cleanup_test_droppings(test_name: str, verbose: int) -> None:
    # First kill any dangling references to open files etc.
    # This can also issue some ResourceWarnings which would otherwise get
    # triggered during the following test run, and possibly produce failures.
    support.gc_collect()

    # Try to clean up junk commonly left behind.  While tests shouldn't leave
    # any files or directories behind, when a test fails that can be tedious
    # for it to arrange.  The consequences can be especially nasty on Windows,
    # since if a test leaves a file open, it cannot be deleted by name (while
    # there's nothing we can do about that here either, we can display the
    # name of the offending test, which is a real help).
    for name in (os_helper.TESTFN,):
        if not os.path.exists(name):
            continue

        if os.path.isdir(name):
            import shutil
            kind, nuker = "directory", shutil.rmtree
        elif os.path.isfile(name):
            kind, nuker = "file", os.unlink
        else:
            raise RuntimeError(f"os.path says {name!r} exists but is neither "
                               f"directory nor file")

        if verbose:
            print_warning(f"{test_name} left behind {kind} {name!r}")
            support.environment_altered = True

        try:
            import stat
            # fix possible permissions problems that might prevent cleanup
            os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
            nuker(name)
        except Exception as exc:
            print_warning(f"{test_name} left behind {kind} {name!r} "
                          f"and it couldn't be removed: {exc}")