diff options
Diffstat (limited to 'Lib/unittest/mock.py')
| -rw-r--r-- | Lib/unittest/mock.py | 324 |
1 files changed, 233 insertions, 91 deletions
diff --git a/Lib/unittest/mock.py b/Lib/unittest/mock.py index 073869a..573c799 100644 --- a/Lib/unittest/mock.py +++ b/Lib/unittest/mock.py @@ -27,7 +27,7 @@ __version__ = '1.0' import inspect import pprint import sys -from functools import wraps +from functools import wraps, partial BaseExceptions = (BaseException,) @@ -66,55 +66,45 @@ DescriptorTypes = ( ) -def _getsignature(func, skipfirst, instance=False): - if isinstance(func, type) and not instance: +def _get_signature_object(func, as_instance, eat_self): + """ + Given an arbitrary, possibly callable object, try to create a suitable + signature object. + Return a (reduced func, signature) tuple, or None. + """ + if isinstance(func, type) and not as_instance: + # If it's a type and should be modelled as a type, use __init__. try: func = func.__init__ except AttributeError: - return - skipfirst = True + return None + # Skip the `self` argument in __init__ + eat_self = True elif not isinstance(func, FunctionTypes): - # for classes where instance is True we end up here too + # If we really want to model an instance of the passed type, + # __call__ should be looked up, not __init__. try: func = func.__call__ except AttributeError: - return - + return None + if eat_self: + sig_func = partial(func, None) + else: + sig_func = func try: - argspec = inspect.getfullargspec(func) - except TypeError: - # C function / method, possibly inherited object().__init__ - return - - regargs, varargs, varkw, defaults, kwonly, kwonlydef, ann = argspec - - - # instance methods and classmethods need to lose the self argument - if getattr(func, '__self__', None) is not None: - regargs = regargs[1:] - if skipfirst: - # this condition and the above one are never both True - why? - regargs = regargs[1:] - - signature = inspect.formatargspec( - regargs, varargs, varkw, defaults, - kwonly, kwonlydef, ann, formatvalue=lambda value: "") - return signature[1:-1], func + return func, inspect.signature(sig_func) + except ValueError: + # Certain callable types are not supported by inspect.signature() + return None def _check_signature(func, mock, skipfirst, instance=False): - if not _callable(func): - return - - result = _getsignature(func, skipfirst, instance) - if result is None: + sig = _get_signature_object(func, instance, skipfirst) + if sig is None: return - signature, func = result - - # can't use self because "self" is common as an argument name - # unfortunately even not in the first place - src = "lambda _mock_self, %s: None" % signature - checksig = eval(src, {}) + func, sig = sig + def checksig(_mock_self, *args, **kwargs): + sig.bind(*args, **kwargs) _copy_func_details(func, checksig) type(mock)._mock_check_sig = checksig @@ -122,11 +112,24 @@ def _check_signature(func, mock, skipfirst, instance=False): def _copy_func_details(func, funcopy): funcopy.__name__ = func.__name__ funcopy.__doc__ = func.__doc__ + try: + funcopy.__text_signature__ = func.__text_signature__ + except AttributeError: + pass # we explicitly don't copy func.__dict__ into this copy as it would # expose original attributes that should be mocked - funcopy.__module__ = func.__module__ - funcopy.__defaults__ = func.__defaults__ - funcopy.__kwdefaults__ = func.__kwdefaults__ + try: + funcopy.__module__ = func.__module__ + except AttributeError: + pass + try: + funcopy.__defaults__ = func.__defaults__ + except AttributeError: + pass + try: + funcopy.__kwdefaults__ = func.__kwdefaults__ + except AttributeError: + pass def _callable(obj): @@ -166,15 +169,12 @@ def _set_signature(mock, original, instance=False): return skipfirst = isinstance(original, type) - result = _getsignature(original, skipfirst, instance) + result = _get_signature_object(original, instance, skipfirst) if result is None: - # was a C function (e.g. object().__init__ ) that can't be mocked return - - signature, func = result - - src = "lambda %s: None" % signature - checksig = eval(src, {}) + func, sig = result + def checksig(*args, **kwargs): + sig.bind(*args, **kwargs) _copy_func_details(func, checksig) name = original.__name__ @@ -343,7 +343,14 @@ def _check_and_set_parent(parent, value, name, new_name): value._mock_name = name return True - +# Internal class to identify if we wrapped an iterator object or not. +class _MockIter(object): + def __init__(self, obj): + self.obj = iter(obj) + def __iter__(self): + return self + def __next__(self): + return next(self.obj) class Base(object): _mock_return_value = DEFAULT @@ -368,7 +375,7 @@ class NonCallableMock(Base): def __init__( self, spec=None, wraps=None, name=None, spec_set=None, parent=None, _spec_state=None, _new_name='', _new_parent=None, - **kwargs + _spec_as_instance=False, _eat_self=None, **kwargs ): if _new_parent is None: _new_parent = parent @@ -382,8 +389,10 @@ class NonCallableMock(Base): if spec_set is not None: spec = spec_set spec_set = True + if _eat_self is None: + _eat_self = parent is not None - self._mock_add_spec(spec, spec_set) + self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) __dict__['_mock_children'] = {} __dict__['_mock_wraps'] = wraps @@ -428,20 +437,26 @@ class NonCallableMock(Base): self._mock_add_spec(spec, spec_set) - def _mock_add_spec(self, spec, spec_set): + def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, + _eat_self=False): _spec_class = None + _spec_signature = None if spec is not None and not _is_list(spec): if isinstance(spec, type): _spec_class = spec else: _spec_class = _get_class(spec) + res = _get_signature_object(spec, + _spec_as_instance, _eat_self) + _spec_signature = res and res[1] spec = dir(spec) __dict__ = self.__dict__ __dict__['_spec_class'] = _spec_class __dict__['_spec_set'] = spec_set + __dict__['_spec_signature'] = _spec_signature __dict__['_mock_methods'] = spec @@ -487,7 +502,11 @@ class NonCallableMock(Base): delegated = self._mock_delegate if delegated is None: return self._mock_side_effect - return delegated.side_effect + sf = delegated.side_effect + if sf is not None and not callable(sf) and not isinstance(sf, _MockIter): + sf = _MockIter(sf) + delegated.side_effect = sf + return sf def __set_side_effect(self, value): value = _try_iter(value) @@ -500,8 +519,14 @@ class NonCallableMock(Base): side_effect = property(__get_side_effect, __set_side_effect) - def reset_mock(self): + def reset_mock(self, visited=None): "Restore the mock object to its initial state." + if visited is None: + visited = [] + if id(self) in visited: + return + visited.append(id(self)) + self.called = False self.call_args = None self.call_count = 0 @@ -512,11 +537,11 @@ class NonCallableMock(Base): for child in self._mock_children.values(): if isinstance(child, _SpecState): continue - child.reset_mock() + child.reset_mock(visited) ret = self._mock_return_value if _is_instance_mock(ret) and ret is not self: - ret.reset_mock() + ret.reset_mock(visited) def configure_mock(self, **kwargs): @@ -695,7 +720,6 @@ class NonCallableMock(Base): self._mock_children[name] = _deleted - def _format_mock_call_signature(self, args, kwargs): name = self._mock_name or 'mock' return _format_call_signature(name, args, kwargs) @@ -711,6 +735,28 @@ class NonCallableMock(Base): return message % (expected_string, actual_string) + def _call_matcher(self, _call): + """ + Given a call (or simply a (args, kwargs) tuple), return a + comparison key suitable for matching with other calls. + This is a best effort method which relies on the spec's signature, + if available, or falls back on the arguments themselves. + """ + sig = self._spec_signature + if sig is not None: + if len(_call) == 2: + name = '' + args, kwargs = _call + else: + name, args, kwargs = _call + try: + return name, sig.bind(*args, **kwargs) + except TypeError as e: + return e.with_traceback(None) + else: + return _call + + def assert_called_with(_mock_self, *args, **kwargs): """assert that the mock was called with the specified arguments. @@ -721,9 +767,14 @@ class NonCallableMock(Base): expected = self._format_mock_call_signature(args, kwargs) raise AssertionError('Expected call: %s\nNot called' % (expected,)) - if self.call_args != (args, kwargs): + def _error_message(): msg = self._format_mock_failure_message(args, kwargs) - raise AssertionError(msg) + return msg + expected = self._call_matcher((args, kwargs)) + actual = self._call_matcher(self.call_args) + if expected != actual: + cause = expected if isinstance(expected, Exception) else None + raise AssertionError(_error_message()) from cause def assert_called_once_with(_mock_self, *args, **kwargs): @@ -747,18 +798,21 @@ class NonCallableMock(Base): If `any_order` is True then the calls can be in any order, but they must all appear in `mock_calls`.""" + expected = [self._call_matcher(c) for c in calls] + cause = expected if isinstance(expected, Exception) else None + all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) if not any_order: - if calls not in self.mock_calls: + if expected not in all_calls: raise AssertionError( 'Calls not found.\nExpected: %r\n' 'Actual: %r' % (calls, self.mock_calls) - ) + ) from cause return - all_calls = list(self.mock_calls) + all_calls = list(all_calls) not_found = [] - for kall in calls: + for kall in expected: try: all_calls.remove(kall) except ValueError: @@ -766,7 +820,7 @@ class NonCallableMock(Base): if not_found: raise AssertionError( '%r not all found in call list' % (tuple(not_found),) - ) + ) from cause def assert_any_call(self, *args, **kwargs): @@ -775,12 +829,14 @@ class NonCallableMock(Base): The assert passes if the mock has *ever* been called, unlike `assert_called_with` and `assert_called_once_with` that only pass if the call is the most recent one.""" - kall = call(*args, **kwargs) - if kall not in self.call_args_list: + expected = self._call_matcher((args, kwargs)) + actual = [self._call_matcher(c) for c in self.call_args_list] + if expected not in actual: + cause = expected if isinstance(expected, Exception) else None expected_string = self._format_mock_call_signature(args, kwargs) raise AssertionError( '%s call not found' % expected_string - ) + ) from cause def _get_child_mock(self, **kw): @@ -850,11 +906,12 @@ class CallableMixin(Base): self = _mock_self self.called = True self.call_count += 1 - self.call_args = _Call((args, kwargs), two=True) - self.call_args_list.append(_Call((args, kwargs), two=True)) - _new_name = self._mock_new_name _new_parent = self._mock_new_parent + + _call = _Call((args, kwargs), two=True) + self.call_args = _call + self.call_args_list.append(_call) self.mock_calls.append(_Call(('', args, kwargs))) seen = set() @@ -909,8 +966,6 @@ class CallableMixin(Base): return result ret_val = effect(*args, **kwargs) - if ret_val is DEFAULT: - ret_val = self.return_value if (self._mock_wraps is not None and self._mock_return_value is DEFAULT): @@ -1001,7 +1056,7 @@ def _is_started(patcher): class _patch(object): attribute_name = None - _active_patches = set() + _active_patches = [] def __init__( self, getter, attribute, new, spec, create, @@ -1274,13 +1329,18 @@ class _patch(object): def start(self): """Activate a patch, returning any created mock.""" result = self.__enter__() - self._active_patches.add(self) + self._active_patches.append(self) return result def stop(self): """Stop an active patch.""" - self._active_patches.discard(self) + try: + self._active_patches.remove(self) + except ValueError: + # If the patch hasn't been started this will fail + pass + return self.__exit__() @@ -1402,7 +1462,7 @@ def patch( used. A more powerful form of `spec` is `autospec`. If you set `autospec=True` - then the mock with be created with a spec from the object being replaced. + then the mock will be created with a spec from the object being replaced. All attributes of the mock will also have the spec of the corresponding attribute of the object being replaced. Methods and functions being mocked will have their arguments checked and will raise a `TypeError` if @@ -1573,8 +1633,8 @@ def _clear_dict(in_dict): def _patch_stopall(): - """Stop all active patches.""" - for patch in list(_patch._active_patches): + """Stop all active patches. LIFO to unroll nested patches.""" + for patch in reversed(_patch._active_patches): patch.stop() @@ -1590,13 +1650,17 @@ magic_methods = ( "len contains iter " "hash str sizeof " "enter exit " - "divmod neg pos abs invert " + # we added divmod and rdivmod here instead of numerics + # because there is no idivmod + "divmod rdivmod neg pos abs invert " "complex int float index " "trunc floor ceil " "bool next " ) -numerics = "add sub mul div floordiv mod lshift rshift and xor or pow " +numerics = ( + "add sub mul div floordiv mod lshift rshift and xor or pow truediv" +) inplace = ' '.join('i%s' % n for n in numerics.split()) right = ' '.join('r%s' % n for n in numerics.split()) @@ -1713,14 +1777,15 @@ def _set_return_value(mock, method, name): class MagicMixin(object): def __init__(self, *args, **kw): + self._mock_set_magics() # make magic work for kwargs in init _safe_super(MagicMixin, self).__init__(*args, **kw) - self._mock_set_magics() + self._mock_set_magics() # fix magic broken by upper level init def _mock_set_magics(self): these_magics = _magics - if self._mock_methods is not None: + if getattr(self, "_mock_methods", None) is not None: these_magics = _magics.intersection(self._mock_methods) remove_magics = set() @@ -1921,8 +1986,7 @@ class _Call(tuple): else: other_args = () other_kwargs = value - else: - # len 2 + elif len_other == 2: # could be (name, args) or (name, kwargs) or (args, kwargs) first, second = other if isinstance(first, str): @@ -1933,6 +1997,8 @@ class _Call(tuple): other_args, other_kwargs = (), second else: other_args, other_kwargs = first, second + else: + return False if self_name and other_name != self_name: return False @@ -2030,6 +2096,8 @@ def create_autospec(spec, spec_set=False, instance=False, _parent=None, elif spec is None: # None we mock with a normal mock without a spec _kwargs = {} + if _kwargs and instance: + _kwargs['_spec_as_instance'] = True _kwargs.update(kwargs) @@ -2043,6 +2111,8 @@ def create_autospec(spec, spec_set=False, instance=False, _parent=None, elif is_type and instance and not _instance_callable(spec): Klass = NonCallableMagicMock + _name = _kwargs.pop('name', _name) + _new_name = _name if _parent is None: # for a top level object no _new_name should be set @@ -2096,10 +2166,12 @@ def create_autospec(spec, spec_set=False, instance=False, _parent=None, if isinstance(spec, FunctionTypes): parent = mock.mock + skipfirst = _must_skip(spec, entry, is_type) + kwargs['_eat_self'] = skipfirst new = MagicMock(parent=parent, name=entry, _new_name=entry, - _new_parent=parent, **kwargs) + _new_parent=parent, + **kwargs) mock._mock_children[entry] = new - skipfirst = _must_skip(spec, entry, is_type) _check_signature(original, new, skipfirst=skipfirst) # so functions created with _set_signature become instance attributes, @@ -2113,6 +2185,10 @@ def create_autospec(spec, spec_set=False, instance=False, _parent=None, def _must_skip(spec, entry, is_type): + """ + Return whether we should skip the first argument on spec's `entry` + attribute. + """ if not isinstance(spec, type): if entry in getattr(spec, '__dict__', {}): # instance attribute - shouldn't skip @@ -2125,7 +2201,12 @@ def _must_skip(spec, entry, is_type): continue if isinstance(result, (staticmethod, classmethod)): return False - return is_type + elif isinstance(getattr(result, '__get__', None), MethodWrapperTypes): + # Normal method => skip if looked up on type + # (if looked up on instance, self is already skipped) + return is_type + else: + return False # shouldn't get here unless function is a dynamically provided attribute # XXXX untested behaviour @@ -2159,9 +2240,33 @@ FunctionTypes = ( type(ANY.__eq__), ) +MethodWrapperTypes = ( + type(ANY.__eq__.__get__), +) + file_spec = None +def _iterate_read_data(read_data): + # Helper for mock_open: + # Retrieve lines from read_data via a generator so that separate calls to + # readline, read, and readlines are properly interleaved + sep = b'\n' if isinstance(read_data, bytes) else '\n' + data_as_list = [l + sep for l in read_data.split(sep)] + + if data_as_list[-1] == sep: + # If the last line ended in a newline, the list comprehension will have an + # extra entry that's just a newline. Remove this. + data_as_list = data_as_list[:-1] + else: + # If there wasn't an extra newline by itself, then the file being + # emulated doesn't have a newline to end the last line remove the + # newline that our naive format() added + data_as_list[-1] = data_as_list[-1][:-1] + + for line in data_as_list: + yield line + def mock_open(mock=None, read_data=''): """ @@ -2172,9 +2277,27 @@ def mock_open(mock=None, read_data=''): default) then a `MagicMock` will be created for you, with the API limited to methods or attributes available on standard file handles. - `read_data` is a string for the `read` method of the file handle to return. - This is an empty string by default. + `read_data` is a string for the `read` methoddline`, and `readlines` of the + file handle to return. This is an empty string by default. """ + def _readlines_side_effect(*args, **kwargs): + if handle.readlines.return_value is not None: + return handle.readlines.return_value + return list(_state[0]) + + def _read_side_effect(*args, **kwargs): + if handle.read.return_value is not None: + return handle.read.return_value + return type(read_data)().join(_state[0]) + + def _readline_side_effect(): + if handle.readline.return_value is not None: + while True: + yield handle.readline.return_value + for line in _state[0]: + yield line + + global file_spec if file_spec is None: import _io @@ -2184,10 +2307,29 @@ def mock_open(mock=None, read_data=''): mock = MagicMock(name='open', spec=open) handle = MagicMock(spec=file_spec) - handle.write.return_value = None handle.__enter__.return_value = handle - handle.read.return_value = read_data + _state = [_iterate_read_data(read_data), None] + + handle.write.return_value = None + handle.read.return_value = None + handle.readline.return_value = None + handle.readlines.return_value = None + + handle.read.side_effect = _read_side_effect + _state[1] = _readline_side_effect() + handle.readline.side_effect = _state[1] + handle.readlines.side_effect = _readlines_side_effect + + def reset_data(*args, **kwargs): + _state[0] = _iterate_read_data(read_data) + if handle.readline.side_effect == _state[1]: + # Only reset the side effect if the user hasn't overridden it. + _state[1] = _readline_side_effect() + handle.readline.side_effect = _state[1] + return DEFAULT + + mock.side_effect = reset_data mock.return_value = handle return mock |
