♻️ Reduce internal cyclic recursion in dependencies, from 2 functions calling each other to 1 calling itself (#14256)

This commit is contained in:
Sebastián Ramírez 2025-10-30 16:35:04 -03:00 committed by GitHub
parent 1fc586c3a5
commit dcfb8b9dda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 101 additions and 36 deletions

View File

@ -129,39 +129,12 @@ def get_parameterless_sub_dependant(*, depends: params.Depends, path: str) -> De
assert callable(depends.dependency), ( assert callable(depends.dependency), (
"A parameter-less dependency must have a callable dependency" "A parameter-less dependency must have a callable dependency"
) )
return get_sub_dependant(depends=depends, dependency=depends.dependency, path=path) use_security_scopes: List[str] = []
if isinstance(depends, params.Security) and depends.scopes:
use_security_scopes.extend(depends.scopes)
def get_sub_dependant( return get_dependant(
*, path=path, call=depends.dependency, security_scopes=use_security_scopes
depends: params.Depends,
dependency: Callable[..., Any],
path: str,
name: Optional[str] = None,
security_scopes: Optional[List[str]] = None,
) -> Dependant:
security_requirement = None
security_scopes = security_scopes or []
if isinstance(depends, params.Security):
if depends.scopes:
security_scopes.extend(depends.scopes)
if isinstance(dependency, SecurityBase):
use_scopes: List[str] = []
if isinstance(dependency, (OAuth2, OpenIdConnect)):
use_scopes = security_scopes
security_requirement = SecurityRequirement(
security_scheme=dependency, scopes=use_scopes
)
sub_dependant = get_dependant(
path=path,
call=dependency,
name=name,
security_scopes=security_scopes,
use_cache=depends.use_cache,
) )
if security_requirement:
sub_dependant.security_requirements.append(security_requirement)
return sub_dependant
CacheKey = Tuple[Optional[Callable[..., Any]], Tuple[str, ...]] CacheKey = Tuple[Optional[Callable[..., Any]], Tuple[str, ...]]
@ -285,13 +258,27 @@ def get_dependant(
) )
if param_details.depends is not None: if param_details.depends is not None:
assert param_details.depends.dependency assert param_details.depends.dependency
sub_dependant = get_sub_dependant( use_security_scopes = security_scopes or []
depends=param_details.depends, if isinstance(param_details.depends, params.Security):
dependency=param_details.depends.dependency, if param_details.depends.scopes:
use_security_scopes.extend(param_details.depends.scopes)
sub_dependant = get_dependant(
path=path, path=path,
call=param_details.depends.dependency,
name=param_name, name=param_name,
security_scopes=security_scopes, security_scopes=use_security_scopes,
use_cache=param_details.depends.use_cache,
) )
if isinstance(param_details.depends.dependency, SecurityBase):
use_scopes: List[str] = []
if isinstance(
param_details.depends.dependency, (OAuth2, OpenIdConnect)
):
use_scopes = use_security_scopes
security_requirement = SecurityRequirement(
security_scheme=param_details.depends.dependency, scopes=use_scopes
)
sub_dependant.security_requirements.append(security_requirement)
dependant.dependencies.append(sub_dependant) dependant.dependencies.append(sub_dependant)
continue continue
if add_non_field_param_to_dependency( if add_non_field_param_to_dependency(

View File

@ -0,0 +1,78 @@
from typing import Union
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import (
OAuth2PasswordBearer,
SecurityScopes,
)
from fastapi.testclient import TestClient
from typing_extensions import Annotated
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def process_auth(
credentials: Annotated[Union[str, None], Security(oauth2_scheme)],
security_scopes: SecurityScopes,
):
# This is an incorrect way of using it, this is not checking if the scopes are
# provided by the token, only if the endpoint is requesting them, but the test
# here is just to check if FastAPI is indeed registering and passing the scopes
# correctly when using Security with parameterless dependencies.
if "a" not in security_scopes.scopes or "b" not in security_scopes.scopes:
raise HTTPException(detail="a or b not in scopes", status_code=401)
return {"token": credentials, "scopes": security_scopes.scopes}
@app.get("/get-credentials")
def get_credentials(
credentials: Annotated[dict, Security(process_auth, scopes=["a", "b"])],
):
return credentials
@app.get(
"/parameterless-with-scopes",
dependencies=[Security(process_auth, scopes=["a", "b"])],
)
def get_parameterless_with_scopes():
return {"status": "ok"}
@app.get(
"/parameterless-without-scopes",
dependencies=[Security(process_auth)],
)
def get_parameterless_without_scopes():
return {"status": "ok"}
client = TestClient(app)
def test_get_credentials():
response = client.get("/get-credentials", headers={"authorization": "Bearer token"})
assert response.status_code == 200, response.text
assert response.json() == {"token": "token", "scopes": ["a", "b"]}
def test_parameterless_with_scopes():
response = client.get(
"/parameterless-with-scopes", headers={"authorization": "Bearer token"}
)
assert response.status_code == 200, response.text
assert response.json() == {"status": "ok"}
def test_parameterless_without_scopes():
response = client.get(
"/parameterless-without-scopes", headers={"authorization": "Bearer token"}
)
assert response.status_code == 401, response.text
assert response.json() == {"detail": "a or b not in scopes"}
def test_call_get_parameterless_without_scopes_for_coverage():
assert get_parameterless_without_scopes() == {"status": "ok"}