diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-08-10 17:50:08 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-08-10 17:50:08 (GMT) |
commit | 6d201685e490138610e8e6fc321bbca6589c3cc2 (patch) | |
tree | e50c14e3683f132f715b3e564864ebf3ec279e84 | |
parent | ac191ce1d38ba7f9c16b6486685abe1afc39c2ea (diff) | |
download | cpython-6d201685e490138610e8e6fc321bbca6589c3cc2.zip cpython-6d201685e490138610e8e6fc321bbca6589c3cc2.tar.gz cpython-6d201685e490138610e8e6fc321bbca6589c3cc2.tar.bz2 |
Close #22175: Improve test_faulthandler readability with dedent.
Patch written by Xavier de Gaye.
-rw-r--r-- | Lib/test/test_faulthandler.py | 350 |
1 files changed, 176 insertions, 174 deletions
diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py index b0fc279..41ccd1e 100644 --- a/Lib/test/test_faulthandler.py +++ b/Lib/test/test_faulthandler.py @@ -10,6 +10,7 @@ from test import support, script_helper from test.script_helper import assert_python_ok import tempfile import unittest +from textwrap import dedent try: import threading @@ -47,6 +48,7 @@ class FaultHandlerTests(unittest.TestCase): build, and replace "Current thread 0x00007f8d8fbd9700" by "Current thread XXX". """ + code = dedent(code).strip() with support.SuppressCrashReport(): process = script_helper.spawn_python('-c', code) stdout, stderr = process.communicate() @@ -76,15 +78,15 @@ class FaultHandlerTests(unittest.TestCase): else: header = 'Stack (most recent call first)' regex = """ -^Fatal Python error: {name} + ^Fatal Python error: {name} -{header}: - File "<string>", line {lineno} in <module> -""".strip() - regex = regex.format( + {header}: + File "<string>", line {lineno} in <module> + """ + regex = dedent(regex.format( lineno=line_number, name=name_regex, - header=re.escape(header)) + header=re.escape(header))).strip() if other_regex: regex += '|' + other_regex output, exitcode = self.get_output(code, filename) @@ -96,29 +98,29 @@ class FaultHandlerTests(unittest.TestCase): "the first page of memory is a mapped read-only on AIX") def test_read_null(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._read_null() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._read_null() + """, 3, # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion '(?:Segmentation fault|Bus error|Illegal instruction)') def test_sigsegv(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._sigsegv() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._sigsegv() + """, 3, 'Segmentation fault') def test_sigabrt(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._sigabrt() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._sigabrt() + """, 3, 'Aborted') @@ -126,10 +128,10 @@ faulthandler._sigabrt() "SIGFPE cannot be caught on Windows") def test_sigfpe(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._sigfpe() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._sigfpe() + """, 3, 'Floating point exception') @@ -137,10 +139,10 @@ faulthandler._sigfpe() "need faulthandler._sigbus()") def test_sigbus(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._sigbus() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._sigbus() + """, 3, 'Bus error') @@ -148,18 +150,18 @@ faulthandler._sigbus() "need faulthandler._sigill()") def test_sigill(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._sigill() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._sigill() + """, 3, 'Illegal instruction') def test_fatal_error(self): self.check_fatal_error(""" -import faulthandler -faulthandler._fatal_error(b'xyz') -""".strip(), + import faulthandler + faulthandler._fatal_error(b'xyz') + """, 2, 'xyz') @@ -170,52 +172,52 @@ faulthandler._fatal_error(b'xyz') 'need faulthandler._stack_overflow()') def test_stack_overflow(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._stack_overflow() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._stack_overflow() + """, 3, '(?:Segmentation fault|Bus error)', other_regex='unable to raise a stack overflow') def test_gil_released(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._read_null(True) -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._read_null(True) + """, 3, '(?:Segmentation fault|Bus error|Illegal instruction)') def test_enable_file(self): with temporary_filename() as filename: self.check_fatal_error(""" -import faulthandler -output = open({filename}, 'wb') -faulthandler.enable(output) -faulthandler._sigsegv() -""".strip().format(filename=repr(filename)), + import faulthandler + output = open({filename}, 'wb') + faulthandler.enable(output) + faulthandler._sigsegv() + """.format(filename=repr(filename)), 4, 'Segmentation fault', filename=filename) def test_enable_single_thread(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable(all_threads=False) -faulthandler._sigsegv() -""".strip(), + import faulthandler + faulthandler.enable(all_threads=False) + faulthandler._sigsegv() + """, 3, 'Segmentation fault', all_threads=False) def test_disable(self): code = """ -import faulthandler -faulthandler.enable() -faulthandler.disable() -faulthandler._sigsegv() -""".strip() + import faulthandler + faulthandler.enable() + faulthandler.disable() + faulthandler._sigsegv() + """ not_expected = 'Fatal Python error' stderr, exitcode = self.get_output(code) stder = '\n'.join(stderr) @@ -283,20 +285,20 @@ faulthandler._sigsegv() Raise an error if the output doesn't match the expected format. """ code = """ -import faulthandler + import faulthandler -def funcB(): - if {has_filename}: - with open({filename}, "wb") as fp: - faulthandler.dump_traceback(fp, all_threads=False) - else: - faulthandler.dump_traceback(all_threads=False) + def funcB(): + if {has_filename}: + with open({filename}, "wb") as fp: + faulthandler.dump_traceback(fp, all_threads=False) + else: + faulthandler.dump_traceback(all_threads=False) -def funcA(): - funcB() + def funcA(): + funcB() -funcA() -""".strip() + funcA() + """ code = code.format( filename=repr(filename), has_filename=bool(filename), @@ -327,13 +329,13 @@ funcA() func_name = 'x' * (maxlen + 50) truncated = 'x' * maxlen + '...' code = """ -import faulthandler + import faulthandler -def {func_name}(): - faulthandler.dump_traceback(all_threads=False) + def {func_name}(): + faulthandler.dump_traceback(all_threads=False) -{func_name}() -""".strip() + {func_name}() + """ code = code.format( func_name=func_name, ) @@ -353,37 +355,37 @@ def {func_name}(): Raise an error if the output doesn't match the expected format. """ code = """ -import faulthandler -from threading import Thread, Event -import time - -def dump(): - if {filename}: - with open({filename}, "wb") as fp: - faulthandler.dump_traceback(fp, all_threads=True) - else: - faulthandler.dump_traceback(all_threads=True) - -class Waiter(Thread): - # avoid blocking if the main thread raises an exception. - daemon = True - - def __init__(self): - Thread.__init__(self) - self.running = Event() - self.stop = Event() - - def run(self): - self.running.set() - self.stop.wait() - -waiter = Waiter() -waiter.start() -waiter.running.wait() -dump() -waiter.stop.set() -waiter.join() -""".strip() + import faulthandler + from threading import Thread, Event + import time + + def dump(): + if {filename}: + with open({filename}, "wb") as fp: + faulthandler.dump_traceback(fp, all_threads=True) + else: + faulthandler.dump_traceback(all_threads=True) + + class Waiter(Thread): + # avoid blocking if the main thread raises an exception. + daemon = True + + def __init__(self): + Thread.__init__(self) + self.running = Event() + self.stop = Event() + + def run(self): + self.running.set() + self.stop.wait() + + waiter = Waiter() + waiter.start() + waiter.running.wait() + dump() + waiter.stop.set() + waiter.join() + """ code = code.format(filename=repr(filename)) output, exitcode = self.get_output(code, filename) output = '\n'.join(output) @@ -392,17 +394,17 @@ waiter.join() else: lineno = 10 regex = """ -^Thread 0x[0-9a-f]+ \(most recent call first\): -(?: File ".*threading.py", line [0-9]+ in [_a-z]+ -){{1,3}} File "<string>", line 23 in run - File ".*threading.py", line [0-9]+ in _bootstrap_inner - File ".*threading.py", line [0-9]+ in _bootstrap - -Current thread XXX \(most recent call first\): - File "<string>", line {lineno} in dump - File "<string>", line 28 in <module>$ -""".strip() - regex = regex.format(lineno=lineno) + ^Thread 0x[0-9a-f]+ \(most recent call first\): + (?: File ".*threading.py", line [0-9]+ in [_a-z]+ + ){{1,3}} File "<string>", line 23 in run + File ".*threading.py", line [0-9]+ in _bootstrap_inner + File ".*threading.py", line [0-9]+ in _bootstrap + + Current thread XXX \(most recent call first\): + File "<string>", line {lineno} in dump + File "<string>", line 28 in <module>$ + """ + regex = dedent(regex.format(lineno=lineno)).strip() self.assertRegex(output, regex) self.assertEqual(exitcode, 0) @@ -423,29 +425,29 @@ Current thread XXX \(most recent call first\): """ timeout_str = str(datetime.timedelta(seconds=TIMEOUT)) code = """ -import faulthandler -import time - -def func(timeout, repeat, cancel, file, loops): - for loop in range(loops): - faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file) - if cancel: - faulthandler.cancel_dump_traceback_later() - time.sleep(timeout * 5) - faulthandler.cancel_dump_traceback_later() - -timeout = {timeout} -repeat = {repeat} -cancel = {cancel} -loops = {loops} -if {has_filename}: - file = open({filename}, "wb") -else: - file = None -func(timeout, repeat, cancel, file, loops) -if file is not None: - file.close() -""".strip() + import faulthandler + import time + + def func(timeout, repeat, cancel, file, loops): + for loop in range(loops): + faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file) + if cancel: + faulthandler.cancel_dump_traceback_later() + time.sleep(timeout * 5) + faulthandler.cancel_dump_traceback_later() + + timeout = {timeout} + repeat = {repeat} + cancel = {cancel} + loops = {loops} + if {has_filename}: + file = open({filename}, "wb") + else: + file = None + func(timeout, repeat, cancel, file, loops) + if file is not None: + file.close() + """ code = code.format( timeout=TIMEOUT, repeat=repeat, @@ -512,45 +514,45 @@ if file is not None: """ signum = signal.SIGUSR1 code = """ -import faulthandler -import os -import signal -import sys + import faulthandler + import os + import signal + import sys -def func(signum): - os.kill(os.getpid(), signum) - -def handler(signum, frame): - handler.called = True -handler.called = False - -exitcode = 0 -signum = {signum} -unregister = {unregister} -chain = {chain} - -if {has_filename}: - file = open({filename}, "wb") -else: - file = None -if chain: - signal.signal(signum, handler) -faulthandler.register(signum, file=file, - all_threads={all_threads}, chain={chain}) -if unregister: - faulthandler.unregister(signum) -func(signum) -if chain and not handler.called: - if file is not None: - output = file - else: - output = sys.stderr - print("Error: signal handler not called!", file=output) - exitcode = 1 -if file is not None: - file.close() -sys.exit(exitcode) -""".strip() + def func(signum): + os.kill(os.getpid(), signum) + + def handler(signum, frame): + handler.called = True + handler.called = False + + exitcode = 0 + signum = {signum} + unregister = {unregister} + chain = {chain} + + if {has_filename}: + file = open({filename}, "wb") + else: + file = None + if chain: + signal.signal(signum, handler) + faulthandler.register(signum, file=file, + all_threads={all_threads}, chain={chain}) + if unregister: + faulthandler.unregister(signum) + func(signum) + if chain and not handler.called: + if file is not None: + output = file + else: + output = sys.stderr + print("Error: signal handler not called!", file=output) + exitcode = 1 + if file is not None: + file.close() + sys.exit(exitcode) + """ code = code.format( filename=repr(filename), has_filename=bool(filename), |