Source code for allows.side_effect

from itertools import chain, repeat
from unittest.mock import call
from typing import Any, Callable

from .exception import AllowsException


[docs]class SideEffectBuilder: """ Uses builder pattern to set up Callable effect and Call argument values for SideEffect objects. Only one effect can be applied in a given builder (return, exception, effect). """ def __init__(self, call_args=None, effect=None) -> None: self._call_args = call_args self._effect = effect
[docs] def with_call_args(self, *args, **kwargs): """ Only invoke side effect when certain args are present """ if self._call_args: raise AllowsException("Cannot set multiple call args in one effect") self._call_args = call(*args, **kwargs) return self
[docs] def with_raised_exception(self, raised_exception: Exception): """ Raise an exception. """ def raise_(*args, **kwargs): raise raised_exception effect = raise_ return self._set_effect(effect)
[docs] def with_return_value(self, *return_values: Any): """ Add a return value. """ return_value_iterator = iter(chain(return_values, repeat(return_values[-1]))) def return_next(*args, **kwargs): return next(return_value_iterator) effect = return_next return self._set_effect(effect)
[docs] def with_effect(self, effect: Callable): """ Add a generic effect to the side effect. """ return self._set_effect(effect)
def _set_effect(self, effect: Callable): if self._effect: raise AllowsException("Cannot set multiple effects") self._effect = lambda *args, **kwargs: effect(*args, **kwargs) return self
[docs] def build(self): """ Create the side effect. """ return SideEffect(call_args=self._call_args, effect=self._effect)
[docs]def no_op(*args, **kwargs): return
[docs]class SideEffect: """ Callable object that can compose many side effects with corresponding arguments or default response. """ def __init__(self, call_args=None, effect=None, default_effect=None): if not call_args and effect: default_effect = effect effect = None elif not call_args and not effect: call_args = call() effect = no_op self._calls = [call_args] if call_args else [] self._effects = [effect] if effect and call_args else [] self._default_effect = default_effect def __call__(self, *args, **kwargs): call_args = call(*args, **kwargs) effect = self._effect_lookup(call_args) return effect(*args, **kwargs) def _effect_lookup(self, call_args): effect = None try: call_index = self._calls.index(call_args) effect = self._effects[call_index] except ValueError: effect = self._default_effect return effect
[docs] def merge(self, other: "SideEffect") -> "SideEffect": """ Build a new SideEffect from this one and another """ if other is None: return self if isinstance(other, self.__class__): self._calls.extend(other._calls) self._effects.extend(other._effects) self._default_effect = self._default_effect or other._default_effect return self raise AllowsException("Cannot merge effects that are not SideEffect")