diff --git a/fastapi/dependencies/models.py b/fastapi/dependencies/models.py index 2a4d9a010..61cfbcb0a 100644 --- a/fastapi/dependencies/models.py +++ b/fastapi/dependencies/models.py @@ -15,6 +15,19 @@ else: # pragma: no cover from asyncio import iscoroutinefunction +def _unwrapped_call(call: Optional[Callable[..., Any]]) -> Optional[Callable[..., Any]]: + if call is None: + return call # pragma: no cover + unwrapped = inspect.unwrap(_impartial(call)) + return unwrapped + + +def _impartial(func: Callable[..., Any]) -> Callable[..., Any]: + while isinstance(func, partial): + func = func.func + return func + + @dataclass class SecurityRequirement: security_scheme: SecurityBase @@ -75,37 +88,57 @@ class Dependant: return True return False - @cached_property - def _unwrapped_call(self) -> Any: - if self.call is None: - return self.call # pragma: no cover - unwrapped = inspect.unwrap(self.call) - if isinstance(unwrapped, partial): - unwrapped = unwrapped.func - return unwrapped - @cached_property def is_gen_callable(self) -> bool: - if inspect.isgeneratorfunction(self._unwrapped_call): + if self.call is None: + return False + if inspect.isgeneratorfunction( + _impartial(self.call) + ) or inspect.isgeneratorfunction(_unwrapped_call(self.call)): return True - dunder_call = getattr(self._unwrapped_call, "__call__", None) # noqa: B004 - return inspect.isgeneratorfunction(dunder_call) + dunder_call = getattr(_impartial(self.call), "__call__", None) # noqa: B004 + if dunder_call is None: + return False + return inspect.isgeneratorfunction( + _impartial(dunder_call) + ) or inspect.isgeneratorfunction(_unwrapped_call(dunder_call)) @cached_property def is_async_gen_callable(self) -> bool: - if inspect.isasyncgenfunction(self._unwrapped_call): + if self.call is None: + return False + if inspect.isasyncgenfunction( + _impartial(self.call) + ) or inspect.isasyncgenfunction(_unwrapped_call(self.call)): return True - dunder_call = getattr(self._unwrapped_call, "__call__", None) # noqa: B004 - return inspect.isasyncgenfunction(dunder_call) + dunder_call = getattr(_impartial(self.call), "__call__", None) # noqa: B004 + if dunder_call is None: + return False + return inspect.isasyncgenfunction( + _impartial(dunder_call) + ) or inspect.isasyncgenfunction(_unwrapped_call(dunder_call)) @cached_property def is_coroutine_callable(self) -> bool: - if inspect.isroutine(self._unwrapped_call): - return iscoroutinefunction(self._unwrapped_call) - if inspect.isclass(self._unwrapped_call): + if self.call is None: return False - dunder_call = getattr(self._unwrapped_call, "__call__", None) # noqa: B004 - return iscoroutinefunction(dunder_call) + if inspect.isroutine(_impartial(self.call)) and iscoroutinefunction( + _impartial(self.call) + ): + return True + if inspect.isroutine(_unwrapped_call(self.call)) and iscoroutinefunction( + _unwrapped_call(self.call) + ): + return True + dunder_call = getattr(_impartial(self.call), "__call__", None) # noqa: B004 + if dunder_call is None: + return False + if iscoroutinefunction(_impartial(dunder_call)) or iscoroutinefunction( + _unwrapped_call(dunder_call) + ): + return True + # if inspect.isclass(self.call): False, covered by default return + return False @cached_property def computed_scope(self) -> Union[str, None]: