mirror of https://github.com/tiangolo/fastapi.git
✨ Add automatic header handling for HTTP Basic Auth (#175)
* ✨ Add automatic header handling for HTTP Basic Auth * 🎨 Remove obsolete comment
This commit is contained in:
parent
a4558e7053
commit
f216d340ec
|
|
@ -2,6 +2,7 @@ import binascii
|
||||||
from base64 import b64decode
|
from base64 import b64decode
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
from fastapi.exceptions import HTTPException
|
||||||
from fastapi.openapi.models import (
|
from fastapi.openapi.models import (
|
||||||
HTTPBase as HTTPBaseModel,
|
HTTPBase as HTTPBaseModel,
|
||||||
HTTPBearer as HTTPBearerModel,
|
HTTPBearer as HTTPBearerModel,
|
||||||
|
|
@ -9,9 +10,8 @@ from fastapi.openapi.models import (
|
||||||
from fastapi.security.base import SecurityBase
|
from fastapi.security.base import SecurityBase
|
||||||
from fastapi.security.utils import get_authorization_scheme_param
|
from fastapi.security.utils import get_authorization_scheme_param
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from starlette.exceptions import HTTPException
|
|
||||||
from starlette.requests import Request
|
from starlette.requests import Request
|
||||||
from starlette.status import HTTP_403_FORBIDDEN
|
from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN
|
||||||
|
|
||||||
|
|
||||||
class HTTPBasicCredentials(BaseModel):
|
class HTTPBasicCredentials(BaseModel):
|
||||||
|
|
@ -59,15 +59,21 @@ class HTTPBasic(HTTPBase):
|
||||||
async def __call__(self, request: Request) -> Optional[HTTPBasicCredentials]:
|
async def __call__(self, request: Request) -> Optional[HTTPBasicCredentials]:
|
||||||
authorization: str = request.headers.get("Authorization")
|
authorization: str = request.headers.get("Authorization")
|
||||||
scheme, param = get_authorization_scheme_param(authorization)
|
scheme, param = get_authorization_scheme_param(authorization)
|
||||||
# before implementing headers with 401 errors, wait for: https://github.com/encode/starlette/issues/295
|
if self.realm:
|
||||||
# unauthorized_headers = {"WWW-Authenticate": "Basic"}
|
unauthorized_headers = {"WWW-Authenticate": f'Basic realm="{self.realm}"'}
|
||||||
|
else:
|
||||||
|
unauthorized_headers = {"WWW-Authenticate": "Basic"}
|
||||||
invalid_user_credentials_exc = HTTPException(
|
invalid_user_credentials_exc = HTTPException(
|
||||||
status_code=HTTP_403_FORBIDDEN, detail="Invalid authentication credentials"
|
status_code=HTTP_401_UNAUTHORIZED,
|
||||||
|
detail="Invalid authentication credentials",
|
||||||
|
headers=unauthorized_headers,
|
||||||
)
|
)
|
||||||
if not authorization or scheme.lower() != "basic":
|
if not authorization or scheme.lower() != "basic":
|
||||||
if self.auto_error:
|
if self.auto_error:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
|
status_code=HTTP_401_UNAUTHORIZED,
|
||||||
|
detail="Not authenticated",
|
||||||
|
headers=unauthorized_headers,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
@ -87,7 +93,7 @@ class HTTPBearer(HTTPBase):
|
||||||
*,
|
*,
|
||||||
bearerFormat: str = None,
|
bearerFormat: str = None,
|
||||||
scheme_name: str = None,
|
scheme_name: str = None,
|
||||||
auto_error: bool = True
|
auto_error: bool = True,
|
||||||
):
|
):
|
||||||
self.model = HTTPBearerModel(bearerFormat=bearerFormat)
|
self.model = HTTPBearerModel(bearerFormat=bearerFormat)
|
||||||
self.scheme_name = scheme_name or self.__class__.__name__
|
self.scheme_name = scheme_name or self.__class__.__name__
|
||||||
|
|
|
||||||
|
|
@ -56,15 +56,17 @@ def test_security_http_basic():
|
||||||
|
|
||||||
def test_security_http_basic_no_credentials():
|
def test_security_http_basic_no_credentials():
|
||||||
response = client.get("/users/me")
|
response = client.get("/users/me")
|
||||||
assert response.status_code == 403
|
|
||||||
assert response.json() == {"detail": "Not authenticated"}
|
assert response.json() == {"detail": "Not authenticated"}
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.headers["WWW-Authenticate"] == "Basic"
|
||||||
|
|
||||||
|
|
||||||
def test_security_http_basic_invalid_credentials():
|
def test_security_http_basic_invalid_credentials():
|
||||||
response = client.get(
|
response = client.get(
|
||||||
"/users/me", headers={"Authorization": "Basic notabase64token"}
|
"/users/me", headers={"Authorization": "Basic notabase64token"}
|
||||||
)
|
)
|
||||||
assert response.status_code == 403
|
assert response.status_code == 401
|
||||||
|
assert response.headers["WWW-Authenticate"] == "Basic"
|
||||||
assert response.json() == {"detail": "Invalid authentication credentials"}
|
assert response.json() == {"detail": "Invalid authentication credentials"}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -72,5 +74,6 @@ def test_security_http_basic_non_basic_credentials():
|
||||||
payload = b64encode(b"johnsecret").decode("ascii")
|
payload = b64encode(b"johnsecret").decode("ascii")
|
||||||
auth_header = f"Basic {payload}"
|
auth_header = f"Basic {payload}"
|
||||||
response = client.get("/users/me", headers={"Authorization": auth_header})
|
response = client.get("/users/me", headers={"Authorization": auth_header})
|
||||||
assert response.status_code == 403
|
assert response.status_code == 401
|
||||||
|
assert response.headers["WWW-Authenticate"] == "Basic"
|
||||||
assert response.json() == {"detail": "Invalid authentication credentials"}
|
assert response.json() == {"detail": "Invalid authentication credentials"}
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,8 @@ def test_security_http_basic_invalid_credentials():
|
||||||
response = client.get(
|
response = client.get(
|
||||||
"/users/me", headers={"Authorization": "Basic notabase64token"}
|
"/users/me", headers={"Authorization": "Basic notabase64token"}
|
||||||
)
|
)
|
||||||
assert response.status_code == 403
|
assert response.status_code == 401
|
||||||
|
assert response.headers["WWW-Authenticate"] == "Basic"
|
||||||
assert response.json() == {"detail": "Invalid authentication credentials"}
|
assert response.json() == {"detail": "Invalid authentication credentials"}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -75,5 +76,6 @@ def test_security_http_basic_non_basic_credentials():
|
||||||
payload = b64encode(b"johnsecret").decode("ascii")
|
payload = b64encode(b"johnsecret").decode("ascii")
|
||||||
auth_header = f"Basic {payload}"
|
auth_header = f"Basic {payload}"
|
||||||
response = client.get("/users/me", headers={"Authorization": auth_header})
|
response = client.get("/users/me", headers={"Authorization": auth_header})
|
||||||
assert response.status_code == 403
|
assert response.status_code == 401
|
||||||
|
assert response.headers["WWW-Authenticate"] == "Basic"
|
||||||
assert response.json() == {"detail": "Invalid authentication credentials"}
|
assert response.json() == {"detail": "Invalid authentication credentials"}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,79 @@
|
||||||
|
from base64 import b64encode
|
||||||
|
|
||||||
|
from fastapi import FastAPI, Security
|
||||||
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||||||
|
from requests.auth import HTTPBasicAuth
|
||||||
|
from starlette.testclient import TestClient
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
security = HTTPBasic(realm="simple")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/users/me")
|
||||||
|
def read_current_user(credentials: HTTPBasicCredentials = Security(security)):
|
||||||
|
return {"username": credentials.username, "password": credentials.password}
|
||||||
|
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
openapi_schema = {
|
||||||
|
"openapi": "3.0.2",
|
||||||
|
"info": {"title": "Fast API", "version": "0.1.0"},
|
||||||
|
"paths": {
|
||||||
|
"/users/me": {
|
||||||
|
"get": {
|
||||||
|
"responses": {
|
||||||
|
"200": {
|
||||||
|
"description": "Successful Response",
|
||||||
|
"content": {"application/json": {"schema": {}}},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"summary": "Read Current User",
|
||||||
|
"operationId": "read_current_user_users_me_get",
|
||||||
|
"security": [{"HTTPBasic": []}],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"components": {
|
||||||
|
"securitySchemes": {"HTTPBasic": {"type": "http", "scheme": "basic"}}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_openapi_schema():
|
||||||
|
response = client.get("/openapi.json")
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == openapi_schema
|
||||||
|
|
||||||
|
|
||||||
|
def test_security_http_basic():
|
||||||
|
auth = HTTPBasicAuth(username="john", password="secret")
|
||||||
|
response = client.get("/users/me", auth=auth)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == {"username": "john", "password": "secret"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_security_http_basic_no_credentials():
|
||||||
|
response = client.get("/users/me")
|
||||||
|
assert response.json() == {"detail": "Not authenticated"}
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.headers["WWW-Authenticate"] == 'Basic realm="simple"'
|
||||||
|
|
||||||
|
|
||||||
|
def test_security_http_basic_invalid_credentials():
|
||||||
|
response = client.get(
|
||||||
|
"/users/me", headers={"Authorization": "Basic notabase64token"}
|
||||||
|
)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.headers["WWW-Authenticate"] == 'Basic realm="simple"'
|
||||||
|
assert response.json() == {"detail": "Invalid authentication credentials"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_security_http_basic_non_basic_credentials():
|
||||||
|
payload = b64encode(b"johnsecret").decode("ascii")
|
||||||
|
auth_header = f"Basic {payload}"
|
||||||
|
response = client.get("/users/me", headers={"Authorization": auth_header})
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.headers["WWW-Authenticate"] == 'Basic realm="simple"'
|
||||||
|
assert response.json() == {"detail": "Invalid authentication credentials"}
|
||||||
Loading…
Reference in New Issue