diff --git a/fastapi/security/api_key.py b/fastapi/security/api_key.py index 89358a913..fa330906f 100644 --- a/fastapi/security/api_key.py +++ b/fastapi/security/api_key.py @@ -4,7 +4,7 @@ from annotated_doc import Doc from fastapi.openapi.models import APIKey, APIKeyIn from fastapi.security.base import SecurityBase from starlette.exceptions import HTTPException -from starlette.requests import Request +from starlette.requests import HTTPConnection from starlette.status import HTTP_401_UNAUTHORIZED @@ -137,8 +137,8 @@ class APIKeyQuery(APIKeyBase): auto_error=auto_error, ) - async def __call__(self, request: Request) -> str | None: - api_key = request.query_params.get(self.model.name) + async def __call__(self, conn: HTTPConnection) -> str | None: + api_key = conn.query_params.get(self.model.name) return self.check_api_key(api_key) @@ -225,8 +225,8 @@ class APIKeyHeader(APIKeyBase): auto_error=auto_error, ) - async def __call__(self, request: Request) -> str | None: - api_key = request.headers.get(self.model.name) + async def __call__(self, conn: HTTPConnection) -> str | None: + api_key = conn.headers.get(self.model.name) return self.check_api_key(api_key) @@ -313,6 +313,6 @@ class APIKeyCookie(APIKeyBase): auto_error=auto_error, ) - async def __call__(self, request: Request) -> str | None: - api_key = request.cookies.get(self.model.name) + async def __call__(self, conn: HTTPConnection) -> str | None: + api_key = conn.cookies.get(self.model.name) return self.check_api_key(api_key) diff --git a/fastapi/security/http.py b/fastapi/security/http.py index 05299323c..c7fd065b9 100644 --- a/fastapi/security/http.py +++ b/fastapi/security/http.py @@ -9,7 +9,7 @@ from fastapi.openapi.models import HTTPBearer as HTTPBearerModel from fastapi.security.base import SecurityBase from fastapi.security.utils import get_authorization_scheme_param from pydantic import BaseModel -from starlette.requests import Request +from starlette.requests import HTTPConnection from starlette.status import HTTP_401_UNAUTHORIZED @@ -91,8 +91,10 @@ class HTTPBase(SecurityBase): headers=self.make_authenticate_headers(), ) - async def __call__(self, request: Request) -> HTTPAuthorizationCredentials | None: - authorization = request.headers.get("Authorization") + async def __call__( + self, conn: HTTPConnection + ) -> HTTPAuthorizationCredentials | None: + authorization = conn.headers.get("Authorization") scheme, credentials = get_authorization_scheme_param(authorization) if not (authorization and scheme and credentials): if self.auto_error: @@ -200,9 +202,9 @@ class HTTPBasic(HTTPBase): return {"WWW-Authenticate": "Basic"} async def __call__( # type: ignore - self, request: Request + self, conn: HTTPConnection ) -> HTTPBasicCredentials | None: - authorization = request.headers.get("Authorization") + authorization = conn.headers.get("Authorization") scheme, param = get_authorization_scheme_param(authorization) if not authorization or scheme.lower() != "basic": if self.auto_error: @@ -300,8 +302,10 @@ class HTTPBearer(HTTPBase): self.scheme_name = scheme_name or self.__class__.__name__ self.auto_error = auto_error - async def __call__(self, request: Request) -> HTTPAuthorizationCredentials | None: - authorization = request.headers.get("Authorization") + async def __call__( + self, conn: HTTPConnection + ) -> HTTPAuthorizationCredentials | None: + authorization = conn.headers.get("Authorization") scheme, credentials = get_authorization_scheme_param(authorization) if not (authorization and scheme and credentials): if self.auto_error: @@ -401,8 +405,10 @@ class HTTPDigest(HTTPBase): self.scheme_name = scheme_name or self.__class__.__name__ self.auto_error = auto_error - async def __call__(self, request: Request) -> HTTPAuthorizationCredentials | None: - authorization = request.headers.get("Authorization") + async def __call__( + self, conn: HTTPConnection + ) -> HTTPAuthorizationCredentials | None: + authorization = conn.headers.get("Authorization") scheme, credentials = get_authorization_scheme_param(authorization) if not (authorization and scheme and credentials): if self.auto_error: diff --git a/fastapi/security/oauth2.py b/fastapi/security/oauth2.py index 661043ce7..ee62314d1 100644 --- a/fastapi/security/oauth2.py +++ b/fastapi/security/oauth2.py @@ -7,7 +7,7 @@ from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel from fastapi.param_functions import Form from fastapi.security.base import SecurityBase from fastapi.security.utils import get_authorization_scheme_param -from starlette.requests import Request +from starlette.requests import HTTPConnection from starlette.status import HTTP_401_UNAUTHORIZED @@ -420,8 +420,8 @@ class OAuth2(SecurityBase): headers={"WWW-Authenticate": "Bearer"}, ) - async def __call__(self, request: Request) -> str | None: - authorization = request.headers.get("Authorization") + async def __call__(self, conn: HTTPConnection) -> str | None: + authorization = conn.headers.get("Authorization") if not authorization: if self.auto_error: raise self.make_not_authenticated_error() @@ -533,8 +533,8 @@ class OAuth2PasswordBearer(OAuth2): auto_error=auto_error, ) - async def __call__(self, request: Request) -> str | None: - authorization = request.headers.get("Authorization") + async def __call__(self, conn: HTTPConnection) -> str | None: + authorization = conn.headers.get("Authorization") scheme, param = get_authorization_scheme_param(authorization) if not authorization or scheme.lower() != "bearer": if self.auto_error: @@ -639,8 +639,8 @@ class OAuth2AuthorizationCodeBearer(OAuth2): auto_error=auto_error, ) - async def __call__(self, request: Request) -> str | None: - authorization = request.headers.get("Authorization") + async def __call__(self, conn: HTTPConnection) -> str | None: + authorization = conn.headers.get("Authorization") scheme, param = get_authorization_scheme_param(authorization) if not authorization or scheme.lower() != "bearer": if self.auto_error: diff --git a/fastapi/security/open_id_connect_url.py b/fastapi/security/open_id_connect_url.py index 1c6fcc744..516935982 100644 --- a/fastapi/security/open_id_connect_url.py +++ b/fastapi/security/open_id_connect_url.py @@ -4,7 +4,7 @@ from annotated_doc import Doc from fastapi.openapi.models import OpenIdConnect as OpenIdConnectModel from fastapi.security.base import SecurityBase from starlette.exceptions import HTTPException -from starlette.requests import Request +from starlette.requests import HTTPConnection from starlette.status import HTTP_401_UNAUTHORIZED @@ -84,8 +84,8 @@ class OpenIdConnect(SecurityBase): headers={"WWW-Authenticate": "Bearer"}, ) - async def __call__(self, request: Request) -> str | None: - authorization = request.headers.get("Authorization") + async def __call__(self, conn: HTTPConnection) -> str | None: + authorization = conn.headers.get("Authorization") if not authorization: if self.auto_error: raise self.make_not_authenticated_error() diff --git a/tests/test_security_http_base_optional.py b/tests/test_security_http_base_optional.py index 1d1944ab0..eae7bc0e6 100644 --- a/tests/test_security_http_base_optional.py +++ b/tests/test_security_http_base_optional.py @@ -1,4 +1,4 @@ -from fastapi import FastAPI, Security +from fastapi import FastAPI, Security, WebSocket from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBase from fastapi.testclient import TestClient from inline_snapshot import snapshot @@ -17,6 +17,19 @@ def read_current_user( return {"scheme": credentials.scheme, "credentials": credentials.credentials} +@app.websocket("/users/timeline") +async def read_user_timeline( + websocket: WebSocket, + credentials: HTTPAuthorizationCredentials | None = Security(security), +): + await websocket.accept() + await websocket.send_json( + {"scheme": credentials.scheme, "credentials": credentials.credentials} + if credentials + else {"msg": "Create an account first"} + ) + + client = TestClient(app) @@ -32,6 +45,20 @@ def test_security_http_base_no_credentials(): assert response.json() == {"msg": "Create an account first"} +def test_security_http_base_with_ws(): + with client.websocket_connect( + "/users/timeline", headers={"Authorization": "Other foobar"} + ) as websocket: + data = websocket.receive_json() + assert data == {"scheme": "Other", "credentials": "foobar"} + + +def test_security_http_base_with_ws_no_credentials(): + with client.websocket_connect("/users/timeline") as websocket: + data = websocket.receive_json() + assert data == {"msg": "Create an account first"} + + def test_openapi_schema(): response = client.get("/openapi.json") assert response.status_code == 200, response.text