This commit is contained in:
Mix 2026-02-06 19:06:37 +00:00 committed by GitHub
commit 518934c699
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 47 additions and 20 deletions

View File

@ -4,7 +4,7 @@ from annotated_doc import Doc
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.security.base import SecurityBase
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.requests import HTTPConnection
from starlette.status import HTTP_401_UNAUTHORIZED
@ -137,7 +137,7 @@ class APIKeyQuery(APIKeyBase):
auto_error=auto_error,
)
async def __call__(self, request: Request) -> Optional[str]:
async def __call__(self, request: HTTPConnection) -> Optional[str]:
api_key = request.query_params.get(self.model.name)
return self.check_api_key(api_key)
@ -225,7 +225,7 @@ class APIKeyHeader(APIKeyBase):
auto_error=auto_error,
)
async def __call__(self, request: Request) -> Optional[str]:
async def __call__(self, request: HTTPConnection) -> Optional[str]:
api_key = request.headers.get(self.model.name)
return self.check_api_key(api_key)
@ -313,6 +313,6 @@ class APIKeyCookie(APIKeyBase):
auto_error=auto_error,
)
async def __call__(self, request: Request) -> Optional[str]:
async def __call__(self, request: HTTPConnection) -> Optional[str]:
api_key = request.cookies.get(self.model.name)
return self.check_api_key(api_key)

View File

@ -9,7 +9,7 @@ from fastapi.openapi.models import HTTPBearer as HTTPBearerModel
from fastapi.security.base import SecurityBase
from fastapi.security.utils import get_authorization_scheme_param
from pydantic import BaseModel
from starlette.requests import Request
from starlette.requests import HTTPConnection
from starlette.status import HTTP_401_UNAUTHORIZED
@ -92,9 +92,9 @@ class HTTPBase(SecurityBase):
)
async def __call__(
self, request: Request
self, conn: HTTPConnection
) -> Optional[HTTPAuthorizationCredentials]:
authorization = request.headers.get("Authorization")
authorization = conn.headers.get("Authorization")
scheme, credentials = get_authorization_scheme_param(authorization)
if not (authorization and scheme and credentials):
if self.auto_error:
@ -202,9 +202,9 @@ class HTTPBasic(HTTPBase):
return {"WWW-Authenticate": "Basic"}
async def __call__( # type: ignore
self, request: Request
self, conn: HTTPConnection
) -> Optional[HTTPBasicCredentials]:
authorization = request.headers.get("Authorization")
authorization = conn.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "basic":
if self.auto_error:
@ -303,9 +303,9 @@ class HTTPBearer(HTTPBase):
self.auto_error = auto_error
async def __call__(
self, request: Request
self, conn: HTTPConnection
) -> Optional[HTTPAuthorizationCredentials]:
authorization = request.headers.get("Authorization")
authorization = conn.headers.get("Authorization")
scheme, credentials = get_authorization_scheme_param(authorization)
if not (authorization and scheme and credentials):
if self.auto_error:
@ -406,9 +406,9 @@ class HTTPDigest(HTTPBase):
self.auto_error = auto_error
async def __call__(
self, request: Request
self, conn: HTTPConnection
) -> Optional[HTTPAuthorizationCredentials]:
authorization = request.headers.get("Authorization")
authorization = conn.headers.get("Authorization")
scheme, credentials = get_authorization_scheme_param(authorization)
if not (authorization and scheme and credentials):
if self.auto_error:

View File

@ -7,7 +7,7 @@ from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.param_functions import Form
from fastapi.security.base import SecurityBase
from fastapi.security.utils import get_authorization_scheme_param
from starlette.requests import Request
from starlette.requests import HTTPConnection
from starlette.status import HTTP_401_UNAUTHORIZED
@ -420,7 +420,7 @@ class OAuth2(SecurityBase):
headers={"WWW-Authenticate": "Bearer"},
)
async def __call__(self, request: Request) -> Optional[str]:
async def __call__(self, request: HTTPConnection) -> Optional[str]:
authorization = request.headers.get("Authorization")
if not authorization:
if self.auto_error:
@ -533,7 +533,7 @@ class OAuth2PasswordBearer(OAuth2):
auto_error=auto_error,
)
async def __call__(self, request: Request) -> Optional[str]:
async def __call__(self, request: HTTPConnection) -> Optional[str]:
authorization = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
@ -639,7 +639,7 @@ class OAuth2AuthorizationCodeBearer(OAuth2):
auto_error=auto_error,
)
async def __call__(self, request: Request) -> Optional[str]:
async def __call__(self, request: HTTPConnection) -> Optional[str]:
authorization = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":

View File

@ -4,7 +4,7 @@ from annotated_doc import Doc
from fastapi.openapi.models import OpenIdConnect as OpenIdConnectModel
from fastapi.security.base import SecurityBase
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.requests import HTTPConnection
from starlette.status import HTTP_401_UNAUTHORIZED
@ -84,7 +84,7 @@ class OpenIdConnect(SecurityBase):
headers={"WWW-Authenticate": "Bearer"},
)
async def __call__(self, request: Request) -> Optional[str]:
async def __call__(self, request: HTTPConnection) -> Optional[str]:
authorization = request.headers.get("Authorization")
if not authorization:
if self.auto_error:

View File

@ -1,6 +1,6 @@
from typing import Optional
from fastapi import FastAPI, Security
from fastapi import FastAPI, Security, WebSocket
from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBase
from fastapi.testclient import TestClient
@ -18,6 +18,19 @@ def read_current_user(
return {"scheme": credentials.scheme, "credentials": credentials.credentials}
@app.websocket("/users/timeline")
async def read_user_timeline(
websocket: WebSocket,
credentials: Optional[HTTPAuthorizationCredentials] = Security(security),
):
await websocket.accept()
await websocket.send_json(
{"scheme": credentials.scheme, "credentials": credentials.credentials}
if credentials
else {"msg": "Create an account first"}
)
client = TestClient(app)
@ -33,6 +46,20 @@ def test_security_http_base_no_credentials():
assert response.json() == {"msg": "Create an account first"}
def test_security_http_base_with_ws():
with client.websocket_connect(
"/users/timeline", headers={"Authorization": "Other foobar"}
) as websocket:
data = websocket.receive_json()
assert data == {"scheme": "Other", "credentials": "foobar"}
def test_security_http_base_with_ws_no_credentials():
with client.websocket_connect("/users/timeline") as websocket:
data = websocket.receive_json()
assert data == {"msg": "Create an account first"}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200, response.text