From 906f0c45fd705f15996d6bce90cd07e4acd69bdd Mon Sep 17 00:00:00 2001 From: briancurtin Date: Tue, 15 Mar 2011 10:29:41 -0400 Subject: Fix #11509. Significantly increase test coverage for fileinput. Patch by Denver Coneybeare at the PyCon 2011 Sprints. --- Lib/test/test_fileinput.py | 620 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 605 insertions(+), 15 deletions(-) diff --git a/Lib/test/test_fileinput.py b/Lib/test/test_fileinput.py index f312882..b32e0e0 100644 --- a/Lib/test/test_fileinput.py +++ b/Lib/test/test_fileinput.py @@ -6,14 +6,18 @@ Nick Mathewson import unittest from test.support import verbose, TESTFN, run_unittest from test.support import unlink as safe_unlink -import sys, re +import os, sys, re from io import StringIO from fileinput import FileInput, hook_encoded +import fileinput +import collections +import gzip, bz2 +import types +import codecs # The fileinput module has 2 interfaces: the FileInput class which does # all the work, and a few functions (input, etc.) that use a global _state -# variable. We only test the FileInput class, since the other functions -# only provide a thin facade over FileInput. +# variable. # Write lines (a list of lines) to temp file number i, and return the # temp file's name. @@ -121,7 +125,16 @@ class BufferSizesTests(unittest.TestCase): self.assertEqual(int(m.group(1)), fi.filelineno()) fi.close() +class UnconditionallyRaise: + def __init__(self, exception_type): + self.exception_type = exception_type + self.invoked = False + def __call__(self, *args, **kwargs): + self.invoked = True + raise self.exception_type() + class FileInputTests(unittest.TestCase): + def test_zero_byte_files(self): t1 = t2 = t3 = t4 = None try: @@ -219,17 +232,20 @@ class FileInputTests(unittest.TestCase): self.fail("FileInput should check openhook for being callable") except ValueError: pass - # XXX The rot13 codec was removed. - # So this test needs to be changed to use something else. - # (Or perhaps the API needs to change so we can just pass - # an encoding rather than using a hook?) -## try: -## t1 = writeTmp(1, ["A\nB"], mode="wb") -## fi = FileInput(files=t1, openhook=hook_encoded("rot13")) -## lines = list(fi) -## self.assertEqual(lines, ["N\n", "O"]) -## finally: -## remove_tempfiles(t1) + + class CustomOpenHook: + def __init__(self): + self.invoked = False + def __call__(self, *args): + self.invoked = True + return open(*args) + + t = writeTmp(1, ["\n"]) + self.addCleanup(remove_tempfiles, t) + custom_open_hook = CustomOpenHook() + with FileInput([t], openhook=custom_open_hook) as fi: + fi.readline() + self.assertTrue(custom_open_hook.invoked, "openhook not invoked") def test_context_manager(self): try: @@ -254,9 +270,583 @@ class FileInputTests(unittest.TestCase): finally: remove_tempfiles(t1) + def test_empty_files_list_specified_to_constructor(self): + with FileInput(files=[]) as fi: + self.assertEquals(fi._files, ('-',)) + + def test__getitem__(self): + """Tests invoking FileInput.__getitem__() with the current + line number""" + t = writeTmp(1, ["line1\n", "line2\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t]) as fi: + retval1 = fi[0] + self.assertEqual(retval1, "line1\n") + retval2 = fi[1] + self.assertEqual(retval2, "line2\n") + + def test__getitem__invalid_key(self): + """Tests invoking FileInput.__getitem__() with an index unequal to + the line number""" + t = writeTmp(1, ["line1\n", "line2\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t]) as fi: + with self.assertRaises(RuntimeError) as cm: + fi[1] + self.assertEquals(cm.exception.args, ("accessing lines out of order",)) + + def test__getitem__eof(self): + """Tests invoking FileInput.__getitem__() with the line number but at + end-of-input""" + t = writeTmp(1, []) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t]) as fi: + with self.assertRaises(IndexError) as cm: + fi[0] + self.assertEquals(cm.exception.args, ("end of input reached",)) + + def test_nextfile_oserror_deleting_backup(self): + """Tests invoking FileInput.nextfile() when the attempt to delete + the backup file would raise OSError. This error is expected to be + silently ignored""" + + os_unlink_orig = os.unlink + os_unlink_replacement = UnconditionallyRaise(OSError) + try: + t = writeTmp(1, ["\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t], inplace=True) as fi: + next(fi) # make sure the file is opened + os.unlink = os_unlink_replacement + fi.nextfile() + finally: + os.unlink = os_unlink_orig + + # sanity check to make sure that our test scenario was actually hit + self.assertTrue(os_unlink_replacement.invoked, + "os.unlink() was not invoked") + + def test_readline_os_fstat_raises_OSError(self): + """Tests invoking FileInput.readline() when os.fstat() raises OSError. + This exception should be silently discarded.""" + + os_fstat_orig = os.fstat + os_fstat_replacement = UnconditionallyRaise(OSError) + try: + t = writeTmp(1, ["\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t], inplace=True) as fi: + os.fstat = os_fstat_replacement + fi.readline() + finally: + os.fstat = os_fstat_orig + + # sanity check to make sure that our test scenario was actually hit + self.assertTrue(os_fstat_replacement.invoked, + "os.fstat() was not invoked") + + @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist") + def test_readline_os_chmod_raises_OSError(self): + """Tests invoking FileInput.readline() when os.chmod() raises OSError. + This exception should be silently discarded.""" + + os_chmod_orig = os.chmod + os_chmod_replacement = UnconditionallyRaise(OSError) + try: + t = writeTmp(1, ["\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t], inplace=True) as fi: + os.chmod = os_chmod_replacement + fi.readline() + finally: + os.chmod = os_chmod_orig + + # sanity check to make sure that our test scenario was actually hit + self.assertTrue(os_chmod_replacement.invoked, + "os.fstat() was not invoked") + + def test_fileno_when_ValueError_raised(self): + class FilenoRaisesValueError(UnconditionallyRaise): + def __init__(self): + UnconditionallyRaise.__init__(self, ValueError) + def fileno(self): + self.__call__() + + unconditionally_raise_ValueError = FilenoRaisesValueError() + t = writeTmp(1, ["\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t]) as fi: + file_backup = fi._file + try: + fi._file = unconditionally_raise_ValueError + result = fi.fileno() + finally: + fi._file = file_backup # make sure the file gets cleaned up + + # sanity check to make sure that our test scenario was actually hit + self.assertTrue(unconditionally_raise_ValueError.invoked, + "_file.fileno() was not invoked") + + self.assertEqual(result, -1, "fileno() should return -1") + +class MockFileInput: + """A class that mocks out fileinput.FileInput for use during unit tests""" + + def __init__(self, files=None, inplace=False, backup="", bufsize=0, + mode="r", openhook=None): + self.files = files + self.inplace = inplace + self.backup = backup + self.bufsize = bufsize + self.mode = mode + self.openhook = openhook + self._file = None + self.invocation_counts = collections.defaultdict(lambda: 0) + self.return_values = {} + + def close(self): + self.invocation_counts["close"] += 1 + + def nextfile(self): + self.invocation_counts["nextfile"] += 1 + return self.return_values["nextfile"] + + def filename(self): + self.invocation_counts["filename"] += 1 + return self.return_values["filename"] + + def lineno(self): + self.invocation_counts["lineno"] += 1 + return self.return_values["lineno"] + + def filelineno(self): + self.invocation_counts["filelineno"] += 1 + return self.return_values["filelineno"] + + def fileno(self): + self.invocation_counts["fileno"] += 1 + return self.return_values["fileno"] + + def isfirstline(self): + self.invocation_counts["isfirstline"] += 1 + return self.return_values["isfirstline"] + + def isstdin(self): + self.invocation_counts["isstdin"] += 1 + return self.return_values["isstdin"] + +class BaseFileInputGlobalMethodsTest(unittest.TestCase): + """Base class for unit tests for the global function of + the fileinput module.""" + + def setUp(self): + self._orig_state = fileinput._state + self._orig_FileInput = fileinput.FileInput + fileinput.FileInput = MockFileInput + + def tearDown(self): + fileinput.FileInput = self._orig_FileInput + fileinput._state = self._orig_state + + def assertExactlyOneInvocation(self, mock_file_input, method_name): + # assert that the method with the given name was invoked once + actual_count = mock_file_input.invocation_counts[method_name] + self.assertEqual(actual_count, 1, method_name) + # assert that no other unexpected methods were invoked + actual_total_count = len(mock_file_input.invocation_counts) + self.assertEqual(actual_total_count, 1) + +class Test_fileinput_input(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.input()""" + + def test_state_is_not_None_and_state_file_is_not_None(self): + """Tests invoking fileinput.input() when fileinput._state is not None + and its _file attribute is also not None. Expect RuntimeError to + be raised with a meaningful error message and for fileinput._state + to *not* be modified.""" + instance = MockFileInput() + instance._file = object() + fileinput._state = instance + with self.assertRaises(RuntimeError) as cm: + fileinput.input() + self.assertEqual(("input() already active",), cm.exception.args) + self.assertIs(instance, fileinput._state, "fileinput._state") + + def test_state_is_not_None_and_state_file_is_None(self): + """Tests invoking fileinput.input() when fileinput._state is not None + but its _file attribute *is* None. Expect it to create and return + a new fileinput.FileInput object with all method parameters passed + explicitly to the __init__() method; also ensure that + fileinput._state is set to the returned instance.""" + instance = MockFileInput() + instance._file = None + fileinput._state = instance + self.do_test_call_input() + + def test_state_is_None(self): + """Tests invoking fileinput.input() when fileinput._state is None + Expect it to create and return a new fileinput.FileInput object + with all method parameters passed explicitly to the __init__() + method; also ensure that fileinput._state is set to the returned + instance.""" + fileinput._state = None + self.do_test_call_input() + + def do_test_call_input(self): + """Tests that fileinput.input() creates a new fileinput.FileInput + object, passing the given parameters unmodified to + fileinput.FileInput.__init__(). Note that this test depends on the + monkey patching of fileinput.FileInput done by setUp().""" + files = object() + inplace = object() + backup = object() + bufsize = object() + mode = object() + openhook = object() + + # call fileinput.input() with different values for each argument + result = fileinput.input(files=files, inplace=inplace, backup=backup, + bufsize=bufsize, + mode=mode, openhook=openhook) + + # ensure fileinput._state was set to the returned object + self.assertIs(result, fileinput._state, "fileinput._state") + + # ensure the parameters to fileinput.input() were passed directly + # to FileInput.__init__() + self.assertIs(files, result.files, "files") + self.assertIs(inplace, result.inplace, "inplace") + self.assertIs(backup, result.backup, "backup") + self.assertIs(bufsize, result.bufsize, "bufsize") + self.assertIs(mode, result.mode, "mode") + self.assertIs(openhook, result.openhook, "openhook") + +class Test_fileinput_close(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.close()""" + + def test_state_is_None(self): + """Tests that fileinput.close() does nothing if fileinput._state + is None""" + fileinput._state = None + fileinput.close() + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests that fileinput.close() invokes close() on fileinput._state + and sets _state=None""" + instance = MockFileInput() + fileinput._state = instance + fileinput.close() + self.assertExactlyOneInvocation(instance, "close") + self.assertIsNone(fileinput._state) + +class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.nextfile()""" + + def test_state_is_None(self): + """Tests fileinput.nextfile() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.nextfile() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.nextfile() when fileinput._state is not None. + Ensure that it invokes fileinput._state.nextfile() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + nextfile_retval = object() + instance = MockFileInput() + instance.return_values["nextfile"] = nextfile_retval + fileinput._state = instance + retval = fileinput.nextfile() + self.assertExactlyOneInvocation(instance, "nextfile") + self.assertIs(retval, nextfile_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_filename(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.filename()""" + + def test_state_is_None(self): + """Tests fileinput.filename() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.filename() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.filename() when fileinput._state is not None. + Ensure that it invokes fileinput._state.filename() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + filename_retval = object() + instance = MockFileInput() + instance.return_values["filename"] = filename_retval + fileinput._state = instance + retval = fileinput.filename() + self.assertExactlyOneInvocation(instance, "filename") + self.assertIs(retval, filename_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.lineno()""" + + def test_state_is_None(self): + """Tests fileinput.lineno() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.lineno() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.lineno() when fileinput._state is not None. + Ensure that it invokes fileinput._state.lineno() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + lineno_retval = object() + instance = MockFileInput() + instance.return_values["lineno"] = lineno_retval + fileinput._state = instance + retval = fileinput.lineno() + self.assertExactlyOneInvocation(instance, "lineno") + self.assertIs(retval, lineno_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.filelineno()""" + + def test_state_is_None(self): + """Tests fileinput.filelineno() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.filelineno() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.filelineno() when fileinput._state is not None. + Ensure that it invokes fileinput._state.filelineno() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + filelineno_retval = object() + instance = MockFileInput() + instance.return_values["filelineno"] = filelineno_retval + fileinput._state = instance + retval = fileinput.filelineno() + self.assertExactlyOneInvocation(instance, "filelineno") + self.assertIs(retval, filelineno_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.fileno()""" + + def test_state_is_None(self): + """Tests fileinput.fileno() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.fileno() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.fileno() when fileinput._state is not None. + Ensure that it invokes fileinput._state.fileno() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + fileno_retval = object() + instance = MockFileInput() + instance.return_values["fileno"] = fileno_retval + instance.fileno_retval = fileno_retval + fileinput._state = instance + retval = fileinput.fileno() + self.assertExactlyOneInvocation(instance, "fileno") + self.assertIs(retval, fileno_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.isfirstline()""" + + def test_state_is_None(self): + """Tests fileinput.isfirstline() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.isfirstline() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.isfirstline() when fileinput._state is not None. + Ensure that it invokes fileinput._state.isfirstline() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + isfirstline_retval = object() + instance = MockFileInput() + instance.return_values["isfirstline"] = isfirstline_retval + fileinput._state = instance + retval = fileinput.isfirstline() + self.assertExactlyOneInvocation(instance, "isfirstline") + self.assertIs(retval, isfirstline_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.isstdin()""" + + def test_state_is_None(self): + """Tests fileinput.isstdin() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.isstdin() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.isstdin() when fileinput._state is not None. + Ensure that it invokes fileinput._state.isstdin() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + isstdin_retval = object() + instance = MockFileInput() + instance.return_values["isstdin"] = isstdin_retval + fileinput._state = instance + retval = fileinput.isstdin() + self.assertExactlyOneInvocation(instance, "isstdin") + self.assertIs(retval, isstdin_retval) + self.assertIs(fileinput._state, instance) + +class InvocationRecorder: + def __init__(self): + self.invocation_count = 0 + def __call__(self, *args, **kwargs): + self.invocation_count += 1 + self.last_invocation = (args, kwargs) + +class Test_hook_compressed(unittest.TestCase): + """Unit tests for fileinput.hook_compressed()""" + + def setUp(self): + self.fake_open = InvocationRecorder() + + def test_empty_string(self): + self.do_test_use_builtin_open("", 1) + + def test_no_ext(self): + self.do_test_use_builtin_open("abcd", 2) + + def test_gz_ext(self): + original_open = gzip.open + gzip.open = self.fake_open + try: + result = fileinput.hook_compressed("test.gz", 3) + finally: + gzip.open = original_open + + self.assertEqual(self.fake_open.invocation_count, 1) + self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {})) + + def test_bz2_ext(self): + original_open = bz2.BZ2File + bz2.BZ2File = self.fake_open + try: + result = fileinput.hook_compressed("test.bz2", 4) + finally: + bz2.BZ2File = original_open + + self.assertEqual(self.fake_open.invocation_count, 1) + self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {})) + + def test_blah_ext(self): + self.do_test_use_builtin_open("abcd.blah", 5) + + def test_Gz_ext(self): + self.do_test_use_builtin_open("abcd.Gz", 6) + + def test_Bz2_ext(self): + self.do_test_use_builtin_open("abcd.Bz2", 7) + + def do_test_use_builtin_open(self, filename, mode): + original_open = self.replace_builtin_open(self.fake_open) + try: + result = fileinput.hook_compressed(filename, mode) + finally: + self.replace_builtin_open(original_open) + + self.assertEqual(self.fake_open.invocation_count, 1) + self.assertEqual(self.fake_open.last_invocation, + ((filename, mode), {})) + + @staticmethod + def replace_builtin_open(new_open_func): + builtins_type = type(__builtins__) + if builtins_type is dict: + original_open = __builtins__["open"] + __builtins__["open"] = new_open_func + elif builtins_type is types.ModuleType: + original_open = __builtins__.open + __builtins__.open = new_open_func + else: + raise RuntimeError( + "unknown __builtins__ type: %r (unable to replace open)" % + builtins_type) + + return original_open + +class Test_hook_encoded(unittest.TestCase): + """Unit tests for fileinput.hook_encoded()""" + + def test(self): + encoding = object() + result = fileinput.hook_encoded(encoding) + + fake_open = InvocationRecorder() + original_open = codecs.open + codecs.open = fake_open + try: + filename = object() + mode = object() + open_result = result(filename, mode) + finally: + codecs.open = original_open + + self.assertEqual(fake_open.invocation_count, 1) + + args = fake_open.last_invocation[0] + self.assertIs(args[0], filename) + self.assertIs(args[1], mode) + self.assertIs(args[2], encoding) def test_main(): - run_unittest(BufferSizesTests, FileInputTests) + run_unittest( + BufferSizesTests, + FileInputTests, + Test_fileinput_input, + Test_fileinput_close, + Test_fileinput_nextfile, + Test_fileinput_filename, + Test_fileinput_lineno, + Test_fileinput_filelineno, + Test_fileinput_fileno, + Test_fileinput_isfirstline, + Test_fileinput_isstdin, + Test_hook_compressed, + Test_hook_encoded, + ) if __name__ == "__main__": test_main() -- cgit v0.12