mirror of https://github.com/tiangolo/fastapi.git
Merge 478cbf0d5a into 272204c0c7
This commit is contained in:
commit
79092d5db2
|
|
@ -21,6 +21,7 @@ from fastapi.security import (
|
|||
OAuth2,
|
||||
OAuth2AuthorizationCodeBearer,
|
||||
OAuth2PasswordBearer,
|
||||
OAuth2ClientCredentials,
|
||||
OAuth2PasswordRequestForm,
|
||||
OAuth2PasswordRequestFormStrict,
|
||||
OpenIdConnect,
|
||||
|
|
@ -58,6 +59,8 @@ from fastapi.security import (
|
|||
|
||||
::: fastapi.security.OAuth2PasswordBearer
|
||||
|
||||
::: fastapi.security.OAuth2ClientCredentials
|
||||
|
||||
## OAuth2 Password Form
|
||||
|
||||
::: fastapi.security.OAuth2PasswordRequestForm
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ from .http import HTTPBearer as HTTPBearer
|
|||
from .http import HTTPDigest as HTTPDigest
|
||||
from .oauth2 import OAuth2 as OAuth2
|
||||
from .oauth2 import OAuth2AuthorizationCodeBearer as OAuth2AuthorizationCodeBearer
|
||||
from .oauth2 import OAuth2ClientCredentials as OAuth2ClientCredentials
|
||||
from .oauth2 import OAuth2PasswordBearer as OAuth2PasswordBearer
|
||||
from .oauth2 import OAuth2PasswordRequestForm as OAuth2PasswordRequestForm
|
||||
from .oauth2 import OAuth2PasswordRequestFormStrict as OAuth2PasswordRequestFormStrict
|
||||
|
|
|
|||
|
|
@ -623,6 +623,105 @@ class OAuth2AuthorizationCodeBearer(OAuth2):
|
|||
return param
|
||||
|
||||
|
||||
class OAuth2ClientCredentials(OAuth2):
|
||||
"""
|
||||
OAuth2 flow for authentication using a bearer token obtained with an OAuth2 client
|
||||
credentials flow. An instance of it would be used as a dependency.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tokenUrl: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
The URL to obtain the OAuth2 token.
|
||||
"""
|
||||
),
|
||||
],
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
scopes: Annotated[
|
||||
Optional[Dict[str, str]],
|
||||
Doc(
|
||||
"""
|
||||
The OAuth2 scopes that would be required by the *path operations* that
|
||||
use this dependency.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if no HTTP Authorization header is provided, required for
|
||||
OAuth2 authentication, it will automatically cancel the request and
|
||||
send the client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the HTTP Authorization header
|
||||
is not available, instead of erroring out, the dependency result will
|
||||
be `None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, with OAuth2
|
||||
or in a cookie).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
if not scopes:
|
||||
scopes = {}
|
||||
flows = OAuthFlowsModel(
|
||||
clientCredentials=cast(
|
||||
Any,
|
||||
{
|
||||
"tokenUrl": tokenUrl,
|
||||
"scopes": scopes,
|
||||
},
|
||||
)
|
||||
)
|
||||
super().__init__(
|
||||
flows=flows,
|
||||
scheme_name=scheme_name,
|
||||
description=description,
|
||||
auto_error=auto_error,
|
||||
)
|
||||
|
||||
async def __call__(self, request: Request) -> Optional[str]:
|
||||
authorization = request.headers.get("Authorization")
|
||||
scheme, param = get_authorization_scheme_param(authorization)
|
||||
if not authorization or scheme.lower() != "bearer":
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_401_UNAUTHORIZED,
|
||||
detail="Not authenticated",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
else:
|
||||
return None
|
||||
return param
|
||||
|
||||
|
||||
class SecurityScopes:
|
||||
"""
|
||||
This is a special class that you can define in a parameter in a dependency to
|
||||
|
|
|
|||
|
|
@ -0,0 +1,72 @@
|
|||
from typing import Optional
|
||||
|
||||
from fastapi import FastAPI, Security
|
||||
from fastapi.security import OAuth2ClientCredentials
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
oauth2_scheme = OAuth2ClientCredentials(tokenUrl="token", auto_error=True)
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(token: Optional[str] = Security(oauth2_scheme)):
|
||||
return {"token": token}
|
||||
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
def test_no_token():
|
||||
response = client.get("/items")
|
||||
assert response.status_code == 401, response.text
|
||||
assert response.json() == {"detail": "Not authenticated"}
|
||||
|
||||
|
||||
def test_incorrect_token():
|
||||
response = client.get("/items", headers={"Authorization": "Non-existent testtoken"})
|
||||
assert response.status_code == 401, response.text
|
||||
assert response.json() == {"detail": "Not authenticated"}
|
||||
|
||||
|
||||
def test_token():
|
||||
response = client.get("/items", headers={"Authorization": "Bearer testtoken"})
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == {"token": "testtoken"}
|
||||
|
||||
|
||||
def test_openapi_schema():
|
||||
response = client.get("/openapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == {
|
||||
"openapi": "3.1.0",
|
||||
"info": {"title": "FastAPI", "version": "0.1.0"},
|
||||
"paths": {
|
||||
"/items/": {
|
||||
"get": {
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {"application/json": {"schema": {}}},
|
||||
}
|
||||
},
|
||||
"summary": "Read Items",
|
||||
"operationId": "read_items_items__get",
|
||||
"security": [{"OAuth2ClientCredentials": []}],
|
||||
}
|
||||
}
|
||||
},
|
||||
"components": {
|
||||
"securitySchemes": {
|
||||
"OAuth2ClientCredentials": {
|
||||
"type": "oauth2",
|
||||
"flows": {
|
||||
"clientCredentials": {
|
||||
"tokenUrl": "token",
|
||||
"scopes": {},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
from typing import Optional
|
||||
|
||||
from fastapi import FastAPI, Security
|
||||
from fastapi.security import OAuth2ClientCredentials
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
oauth2_scheme = OAuth2ClientCredentials(
|
||||
tokenUrl="token",
|
||||
description="OAuth2 Client Credentials Flow",
|
||||
auto_error=True,
|
||||
)
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(token: Optional[str] = Security(oauth2_scheme)):
|
||||
return {"token": token}
|
||||
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
def test_no_token():
|
||||
response = client.get("/items")
|
||||
assert response.status_code == 401, response.text
|
||||
assert response.json() == {"detail": "Not authenticated"}
|
||||
|
||||
|
||||
def test_incorrect_token():
|
||||
response = client.get("/items", headers={"Authorization": "Non-existent testtoken"})
|
||||
assert response.status_code == 401, response.text
|
||||
assert response.json() == {"detail": "Not authenticated"}
|
||||
|
||||
|
||||
def test_token():
|
||||
response = client.get("/items", headers={"Authorization": "Bearer testtoken"})
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == {"token": "testtoken"}
|
||||
|
||||
|
||||
def test_openapi_schema():
|
||||
response = client.get("/openapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == {
|
||||
"openapi": "3.1.0",
|
||||
"info": {"title": "FastAPI", "version": "0.1.0"},
|
||||
"paths": {
|
||||
"/items/": {
|
||||
"get": {
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {"application/json": {"schema": {}}},
|
||||
}
|
||||
},
|
||||
"summary": "Read Items",
|
||||
"operationId": "read_items_items__get",
|
||||
"security": [{"OAuth2ClientCredentials": []}],
|
||||
}
|
||||
}
|
||||
},
|
||||
"components": {
|
||||
"securitySchemes": {
|
||||
"OAuth2ClientCredentials": {
|
||||
"type": "oauth2",
|
||||
"flows": {
|
||||
"clientCredentials": {
|
||||
"tokenUrl": "token",
|
||||
"scopes": {},
|
||||
}
|
||||
},
|
||||
"description": "OAuth2 Client Credentials Flow",
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
|
@ -0,0 +1,72 @@
|
|||
from typing import Optional
|
||||
|
||||
from fastapi import FastAPI, Security
|
||||
from fastapi.security import OAuth2ClientCredentials
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
oauth2_scheme = OAuth2ClientCredentials(tokenUrl="token", auto_error=False)
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(token: Optional[str] = Security(oauth2_scheme)):
|
||||
return {"token": token}
|
||||
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
def test_no_token():
|
||||
response = client.get("/items")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == {"token": None}
|
||||
|
||||
|
||||
def test_incorrect_token():
|
||||
response = client.get("/items", headers={"Authorization": "Non-existent testtoken"})
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == {"token": None}
|
||||
|
||||
|
||||
def test_token():
|
||||
response = client.get("/items", headers={"Authorization": "Bearer testtoken"})
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == {"token": "testtoken"}
|
||||
|
||||
|
||||
def test_openapi_schema():
|
||||
response = client.get("/openapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == {
|
||||
"openapi": "3.1.0",
|
||||
"info": {"title": "FastAPI", "version": "0.1.0"},
|
||||
"paths": {
|
||||
"/items/": {
|
||||
"get": {
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {"application/json": {"schema": {}}},
|
||||
}
|
||||
},
|
||||
"summary": "Read Items",
|
||||
"operationId": "read_items_items__get",
|
||||
"security": [{"OAuth2ClientCredentials": []}],
|
||||
}
|
||||
}
|
||||
},
|
||||
"components": {
|
||||
"securitySchemes": {
|
||||
"OAuth2ClientCredentials": {
|
||||
"type": "oauth2",
|
||||
"flows": {
|
||||
"clientCredentials": {
|
||||
"tokenUrl": "token",
|
||||
"scopes": {},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
Loading…
Reference in New Issue