summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_faulthandler.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_faulthandler.py')
-rw-r--r--Lib/test/test_faulthandler.py458
1 files changed, 246 insertions, 212 deletions
diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py
index 770e70c..e68a09e 100644
--- a/Lib/test/test_faulthandler.py
+++ b/Lib/test/test_faulthandler.py
@@ -10,6 +10,7 @@ from test import support, script_helper
from test.script_helper import assert_python_ok
import tempfile
import unittest
+from textwrap import dedent
try:
import threading
@@ -19,18 +20,6 @@ except ImportError:
TIMEOUT = 0.5
-try:
- from resource import setrlimit, RLIMIT_CORE, error as resource_error
-except ImportError:
- prepare_subprocess = None
-else:
- def prepare_subprocess():
- # don't create core file
- try:
- setrlimit(RLIMIT_CORE, (0, 0))
- except (ValueError, resource_error):
- pass
-
def expected_traceback(lineno1, lineno2, header, min_count=1):
regex = header
regex += ' File "<string>", line %s in func\n' % lineno1
@@ -59,10 +48,9 @@ class FaultHandlerTests(unittest.TestCase):
build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
thread XXX".
"""
- options = {}
- if prepare_subprocess:
- options['preexec_fn'] = prepare_subprocess
- process = script_helper.spawn_python('-c', code, **options)
+ code = dedent(code).strip()
+ with support.SuppressCrashReport():
+ process = script_helper.spawn_python('-c', code)
stdout, stderr = process.communicate()
exitcode = process.wait()
output = support.strip_python_stderr(stdout)
@@ -86,23 +74,22 @@ class FaultHandlerTests(unittest.TestCase):
Raise an error if the output doesn't match the expected format.
"""
if all_threads:
- header = 'Current thread XXX'
+ header = 'Current thread XXX (most recent call first)'
else:
- header = 'Traceback (most recent call first)'
+ header = 'Stack (most recent call first)'
regex = """
-^Fatal Python error: {name}
+ ^Fatal Python error: {name}
-{header}:
- File "<string>", line {lineno} in <module>
-""".strip()
- regex = regex.format(
+ {header}:
+ File "<string>", line {lineno} in <module>
+ """
+ regex = dedent(regex.format(
lineno=line_number,
name=name_regex,
- header=re.escape(header))
+ header=re.escape(header))).strip()
if other_regex:
regex += '|' + other_regex
- with support.suppress_crash_popup():
- output, exitcode = self.get_output(code, filename)
+ output, exitcode = self.get_output(code, filename)
output = '\n'.join(output)
self.assertRegex(output, regex)
self.assertNotEqual(exitcode, 0)
@@ -111,29 +98,29 @@ class FaultHandlerTests(unittest.TestCase):
"the first page of memory is a mapped read-only on AIX")
def test_read_null(self):
self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._read_null()
-""".strip(),
+ import faulthandler
+ faulthandler.enable()
+ faulthandler._read_null()
+ """,
3,
# Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
'(?:Segmentation fault|Bus error|Illegal instruction)')
def test_sigsegv(self):
self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._sigsegv()
-""".strip(),
+ import faulthandler
+ faulthandler.enable()
+ faulthandler._sigsegv()
+ """,
3,
'Segmentation fault')
def test_sigabrt(self):
self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._sigabrt()
-""".strip(),
+ import faulthandler
+ faulthandler.enable()
+ faulthandler._sigabrt()
+ """,
3,
'Aborted')
@@ -141,10 +128,10 @@ faulthandler._sigabrt()
"SIGFPE cannot be caught on Windows")
def test_sigfpe(self):
self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._sigfpe()
-""".strip(),
+ import faulthandler
+ faulthandler.enable()
+ faulthandler._sigfpe()
+ """,
3,
'Floating point exception')
@@ -152,10 +139,10 @@ faulthandler._sigfpe()
"need faulthandler._sigbus()")
def test_sigbus(self):
self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._sigbus()
-""".strip(),
+ import faulthandler
+ faulthandler.enable()
+ faulthandler._sigbus()
+ """,
3,
'Bus error')
@@ -163,18 +150,18 @@ faulthandler._sigbus()
"need faulthandler._sigill()")
def test_sigill(self):
self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._sigill()
-""".strip(),
+ import faulthandler
+ faulthandler.enable()
+ faulthandler._sigill()
+ """,
3,
'Illegal instruction')
def test_fatal_error(self):
self.check_fatal_error("""
-import faulthandler
-faulthandler._fatal_error(b'xyz')
-""".strip(),
+ import faulthandler
+ faulthandler._fatal_error(b'xyz')
+ """,
2,
'xyz')
@@ -185,56 +172,55 @@ faulthandler._fatal_error(b'xyz')
'need faulthandler._stack_overflow()')
def test_stack_overflow(self):
self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._stack_overflow()
-""".strip(),
+ import faulthandler
+ faulthandler.enable()
+ faulthandler._stack_overflow()
+ """,
3,
'(?:Segmentation fault|Bus error)',
other_regex='unable to raise a stack overflow')
def test_gil_released(self):
self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._read_null(True)
-""".strip(),
+ import faulthandler
+ faulthandler.enable()
+ faulthandler._sigsegv(True)
+ """,
3,
- '(?:Segmentation fault|Bus error|Illegal instruction)')
+ 'Segmentation fault')
def test_enable_file(self):
with temporary_filename() as filename:
self.check_fatal_error("""
-import faulthandler
-output = open({filename}, 'wb')
-faulthandler.enable(output)
-faulthandler._sigsegv()
-""".strip().format(filename=repr(filename)),
+ import faulthandler
+ output = open({filename}, 'wb')
+ faulthandler.enable(output)
+ faulthandler._sigsegv()
+ """.format(filename=repr(filename)),
4,
'Segmentation fault',
filename=filename)
def test_enable_single_thread(self):
self.check_fatal_error("""
-import faulthandler
-faulthandler.enable(all_threads=False)
-faulthandler._sigsegv()
-""".strip(),
+ import faulthandler
+ faulthandler.enable(all_threads=False)
+ faulthandler._sigsegv()
+ """,
3,
'Segmentation fault',
all_threads=False)
def test_disable(self):
code = """
-import faulthandler
-faulthandler.enable()
-faulthandler.disable()
-faulthandler._sigsegv()
-""".strip()
+ import faulthandler
+ faulthandler.enable()
+ faulthandler.disable()
+ faulthandler._sigsegv()
+ """
not_expected = 'Fatal Python error'
- with support.suppress_crash_popup():
- stderr, exitcode = self.get_output(code)
- stder = '\n'.join(stderr)
+ stderr, exitcode = self.get_output(code)
+ stderr = '\n'.join(stderr)
self.assertTrue(not_expected not in stderr,
"%r is present in %r" % (not_expected, stderr))
self.assertNotEqual(exitcode, 0)
@@ -264,16 +250,42 @@ faulthandler._sigsegv()
def test_disabled_by_default(self):
# By default, the module should be disabled
code = "import faulthandler; print(faulthandler.is_enabled())"
- rc, stdout, stderr = assert_python_ok("-c", code)
- stdout = (stdout + stderr).strip()
- self.assertEqual(stdout, b"False")
+ args = filter(None, (sys.executable,
+ "-E" if sys.flags.ignore_environment else "",
+ "-c", code))
+ env = os.environ.copy()
+ env.pop("PYTHONFAULTHANDLER", None)
+ # don't use assert_python_ok() because it always enables faulthandler
+ output = subprocess.check_output(args, env=env)
+ self.assertEqual(output.rstrip(), b"False")
def test_sys_xoptions(self):
# Test python -X faulthandler
code = "import faulthandler; print(faulthandler.is_enabled())"
- rc, stdout, stderr = assert_python_ok("-X", "faulthandler", "-c", code)
- stdout = (stdout + stderr).strip()
- self.assertEqual(stdout, b"True")
+ args = filter(None, (sys.executable,
+ "-E" if sys.flags.ignore_environment else "",
+ "-X", "faulthandler", "-c", code))
+ env = os.environ.copy()
+ env.pop("PYTHONFAULTHANDLER", None)
+ # don't use assert_python_ok() because it always enables faulthandler
+ output = subprocess.check_output(args, env=env)
+ self.assertEqual(output.rstrip(), b"True")
+
+ def test_env_var(self):
+ # empty env var
+ code = "import faulthandler; print(faulthandler.is_enabled())"
+ args = (sys.executable, "-c", code)
+ env = os.environ.copy()
+ env['PYTHONFAULTHANDLER'] = ''
+ # don't use assert_python_ok() because it always enables faulthandler
+ output = subprocess.check_output(args, env=env)
+ self.assertEqual(output.rstrip(), b"False")
+
+ # non-empty env var
+ env = os.environ.copy()
+ env['PYTHONFAULTHANDLER'] = '1'
+ output = subprocess.check_output(args, env=env)
+ self.assertEqual(output.rstrip(), b"True")
def check_dump_traceback(self, filename):
"""
@@ -281,20 +293,20 @@ faulthandler._sigsegv()
Raise an error if the output doesn't match the expected format.
"""
code = """
-import faulthandler
+ import faulthandler
-def funcB():
- if {has_filename}:
- with open({filename}, "wb") as fp:
- faulthandler.dump_traceback(fp, all_threads=False)
- else:
- faulthandler.dump_traceback(all_threads=False)
+ def funcB():
+ if {has_filename}:
+ with open({filename}, "wb") as fp:
+ faulthandler.dump_traceback(fp, all_threads=False)
+ else:
+ faulthandler.dump_traceback(all_threads=False)
-def funcA():
- funcB()
+ def funcA():
+ funcB()
-funcA()
-""".strip()
+ funcA()
+ """
code = code.format(
filename=repr(filename),
has_filename=bool(filename),
@@ -304,7 +316,7 @@ funcA()
else:
lineno = 8
expected = [
- 'Traceback (most recent call first):',
+ 'Stack (most recent call first):',
' File "<string>", line %s in funcB' % lineno,
' File "<string>", line 11 in funcA',
' File "<string>", line 13 in <module>'
@@ -325,18 +337,18 @@ funcA()
func_name = 'x' * (maxlen + 50)
truncated = 'x' * maxlen + '...'
code = """
-import faulthandler
+ import faulthandler
-def {func_name}():
- faulthandler.dump_traceback(all_threads=False)
+ def {func_name}():
+ faulthandler.dump_traceback(all_threads=False)
-{func_name}()
-""".strip()
+ {func_name}()
+ """
code = code.format(
func_name=func_name,
)
expected = [
- 'Traceback (most recent call first):',
+ 'Stack (most recent call first):',
' File "<string>", line 4 in %s' % truncated,
' File "<string>", line 6 in <module>'
]
@@ -351,37 +363,37 @@ def {func_name}():
Raise an error if the output doesn't match the expected format.
"""
code = """
-import faulthandler
-from threading import Thread, Event
-import time
-
-def dump():
- if {filename}:
- with open({filename}, "wb") as fp:
- faulthandler.dump_traceback(fp, all_threads=True)
- else:
- faulthandler.dump_traceback(all_threads=True)
-
-class Waiter(Thread):
- # avoid blocking if the main thread raises an exception.
- daemon = True
-
- def __init__(self):
- Thread.__init__(self)
- self.running = Event()
- self.stop = Event()
-
- def run(self):
- self.running.set()
- self.stop.wait()
-
-waiter = Waiter()
-waiter.start()
-waiter.running.wait()
-dump()
-waiter.stop.set()
-waiter.join()
-""".strip()
+ import faulthandler
+ from threading import Thread, Event
+ import time
+
+ def dump():
+ if {filename}:
+ with open({filename}, "wb") as fp:
+ faulthandler.dump_traceback(fp, all_threads=True)
+ else:
+ faulthandler.dump_traceback(all_threads=True)
+
+ class Waiter(Thread):
+ # avoid blocking if the main thread raises an exception.
+ daemon = True
+
+ def __init__(self):
+ Thread.__init__(self)
+ self.running = Event()
+ self.stop = Event()
+
+ def run(self):
+ self.running.set()
+ self.stop.wait()
+
+ waiter = Waiter()
+ waiter.start()
+ waiter.running.wait()
+ dump()
+ waiter.stop.set()
+ waiter.join()
+ """
code = code.format(filename=repr(filename))
output, exitcode = self.get_output(code, filename)
output = '\n'.join(output)
@@ -390,17 +402,17 @@ waiter.join()
else:
lineno = 10
regex = """
-^Thread 0x[0-9a-f]+:
-(?: File ".*threading.py", line [0-9]+ in [_a-z]+
-){{1,3}} File "<string>", line 23 in run
- File ".*threading.py", line [0-9]+ in _bootstrap_inner
- File ".*threading.py", line [0-9]+ in _bootstrap
-
-Current thread XXX:
- File "<string>", line {lineno} in dump
- File "<string>", line 28 in <module>$
-""".strip()
- regex = regex.format(lineno=lineno)
+ ^Thread 0x[0-9a-f]+ \(most recent call first\):
+ (?: File ".*threading.py", line [0-9]+ in [_a-z]+
+ ){{1,3}} File "<string>", line 23 in run
+ File ".*threading.py", line [0-9]+ in _bootstrap_inner
+ File ".*threading.py", line [0-9]+ in _bootstrap
+
+ Current thread XXX \(most recent call first\):
+ File "<string>", line {lineno} in dump
+ File "<string>", line 28 in <module>$
+ """
+ regex = dedent(regex.format(lineno=lineno)).strip()
self.assertRegex(output, regex)
self.assertEqual(exitcode, 0)
@@ -421,29 +433,29 @@ Current thread XXX:
"""
timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
code = """
-import faulthandler
-import time
-
-def func(timeout, repeat, cancel, file, loops):
- for loop in range(loops):
- faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
- if cancel:
- faulthandler.cancel_dump_traceback_later()
- time.sleep(timeout * 5)
- faulthandler.cancel_dump_traceback_later()
-
-timeout = {timeout}
-repeat = {repeat}
-cancel = {cancel}
-loops = {loops}
-if {has_filename}:
- file = open({filename}, "wb")
-else:
- file = None
-func(timeout, repeat, cancel, file, loops)
-if file is not None:
- file.close()
-""".strip()
+ import faulthandler
+ import time
+
+ def func(timeout, repeat, cancel, file, loops):
+ for loop in range(loops):
+ faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
+ if cancel:
+ faulthandler.cancel_dump_traceback_later()
+ time.sleep(timeout * 5)
+ faulthandler.cancel_dump_traceback_later()
+
+ timeout = {timeout}
+ repeat = {repeat}
+ cancel = {cancel}
+ loops = {loops}
+ if {has_filename}:
+ file = open({filename}, "wb")
+ else:
+ file = None
+ func(timeout, repeat, cancel, file, loops)
+ if file is not None:
+ file.close()
+ """
code = code.format(
timeout=TIMEOUT,
repeat=repeat,
@@ -459,7 +471,7 @@ if file is not None:
count = loops
if repeat:
count *= 2
- header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+:\n' % timeout_str
+ header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
regex = expected_traceback(9, 20, header, min_count=count)
self.assertRegex(trace, regex)
else:
@@ -510,45 +522,45 @@ if file is not None:
"""
signum = signal.SIGUSR1
code = """
-import faulthandler
-import os
-import signal
-import sys
+ import faulthandler
+ import os
+ import signal
+ import sys
-def func(signum):
- os.kill(os.getpid(), signum)
-
-def handler(signum, frame):
- handler.called = True
-handler.called = False
-
-exitcode = 0
-signum = {signum}
-unregister = {unregister}
-chain = {chain}
-
-if {has_filename}:
- file = open({filename}, "wb")
-else:
- file = None
-if chain:
- signal.signal(signum, handler)
-faulthandler.register(signum, file=file,
- all_threads={all_threads}, chain={chain})
-if unregister:
- faulthandler.unregister(signum)
-func(signum)
-if chain and not handler.called:
- if file is not None:
- output = file
- else:
- output = sys.stderr
- print("Error: signal handler not called!", file=output)
- exitcode = 1
-if file is not None:
- file.close()
-sys.exit(exitcode)
-""".strip()
+ def func(signum):
+ os.kill(os.getpid(), signum)
+
+ def handler(signum, frame):
+ handler.called = True
+ handler.called = False
+
+ exitcode = 0
+ signum = {signum}
+ unregister = {unregister}
+ chain = {chain}
+
+ if {has_filename}:
+ file = open({filename}, "wb")
+ else:
+ file = None
+ if chain:
+ signal.signal(signum, handler)
+ faulthandler.register(signum, file=file,
+ all_threads={all_threads}, chain={chain})
+ if unregister:
+ faulthandler.unregister(signum)
+ func(signum)
+ if chain and not handler.called:
+ if file is not None:
+ output = file
+ else:
+ output = sys.stderr
+ print("Error: signal handler not called!", file=output)
+ exitcode = 1
+ if file is not None:
+ file.close()
+ sys.exit(exitcode)
+ """
code = code.format(
filename=repr(filename),
has_filename=bool(filename),
@@ -561,9 +573,9 @@ sys.exit(exitcode)
trace = '\n'.join(trace)
if not unregister:
if all_threads:
- regex = 'Current thread XXX:\n'
+ regex = 'Current thread XXX \(most recent call first\):\n'
else:
- regex = 'Traceback \(most recent call first\):\n'
+ regex = 'Stack \(most recent call first\):\n'
regex = expected_traceback(7, 28, regex)
self.assertRegex(trace, regex)
else:
@@ -589,9 +601,31 @@ sys.exit(exitcode)
def test_register_chain(self):
self.check_register(chain=True)
+ @contextmanager
+ def check_stderr_none(self):
+ stderr = sys.stderr
+ try:
+ sys.stderr = None
+ with self.assertRaises(RuntimeError) as cm:
+ yield
+ self.assertEqual(str(cm.exception), "sys.stderr is None")
+ finally:
+ sys.stderr = stderr
+
+ def test_stderr_None(self):
+ # Issue #21497: provide an helpful error if sys.stderr is None,
+ # instead of just an attribute error: "None has no attribute fileno".
+ with self.check_stderr_none():
+ faulthandler.enable()
+ with self.check_stderr_none():
+ faulthandler.dump_traceback()
+ if hasattr(faulthandler, 'dump_traceback_later'):
+ with self.check_stderr_none():
+ faulthandler.dump_traceback_later(1e-3)
+ if hasattr(faulthandler, "register"):
+ with self.check_stderr_none():
+ faulthandler.register(signal.SIGUSR1)
-def test_main():
- support.run_unittest(FaultHandlerTests)
if __name__ == "__main__":
- test_main()
+ unittest.main()