mirror of https://github.com/tiangolo/fastapi.git
Add AsyncAPI support for Websockets
This commit is contained in:
parent
a4ad07b48a
commit
26a903f1e2
|
|
@ -18,11 +18,13 @@ from fastapi.exceptions import RequestValidationError, WebSocketRequestValidatio
|
|||
from fastapi.logger import logger
|
||||
from fastapi.middleware.asyncexitstack import AsyncExitStackMiddleware
|
||||
from fastapi.openapi.docs import (
|
||||
get_asyncapi_html,
|
||||
get_redoc_html,
|
||||
get_swagger_ui_html,
|
||||
get_swagger_ui_oauth2_redirect_html,
|
||||
)
|
||||
from fastapi.openapi.utils import get_openapi
|
||||
from fastapi.openapi.asyncapi_utils import get_asyncapi
|
||||
from fastapi.params import Depends
|
||||
from fastapi.types import DecoratedCallable, IncEx
|
||||
from fastapi.utils import generate_unique_id
|
||||
|
|
@ -446,6 +448,49 @@ class FastAPI(Starlette):
|
|||
"""
|
||||
),
|
||||
] = "/redoc",
|
||||
asyncapi_url: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
The URL where the AsyncAPI schema will be served from.
|
||||
|
||||
If you set it to `None`, no AsyncAPI schema will be served publicly, and
|
||||
the default automatic endpoint `/asyncapi-docs` will also be disabled.
|
||||
|
||||
AsyncAPI is used to document WebSocket endpoints, similar to how OpenAPI
|
||||
documents HTTP endpoints.
|
||||
|
||||
**Example**
|
||||
|
||||
```python
|
||||
from fastapi import FastAPI
|
||||
|
||||
app = FastAPI(asyncapi_url="/api/v1/asyncapi.json")
|
||||
```
|
||||
"""
|
||||
),
|
||||
] = "/asyncapi.json",
|
||||
asyncapi_docs_url: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
The URL where the AsyncAPI documentation UI will be served from.
|
||||
|
||||
If you set it to `None`, the AsyncAPI documentation UI will be disabled.
|
||||
|
||||
This provides an interactive UI for viewing WebSocket endpoint documentation,
|
||||
similar to how `/docs` provides Swagger UI for HTTP endpoints.
|
||||
|
||||
**Example**
|
||||
|
||||
```python
|
||||
from fastapi import FastAPI
|
||||
|
||||
app = FastAPI(asyncapi_docs_url="/async-docs")
|
||||
```
|
||||
"""
|
||||
),
|
||||
] = "/asyncapi-docs",
|
||||
swagger_ui_oauth2_redirect_url: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
|
|
@ -886,6 +931,8 @@ class FastAPI(Starlette):
|
|||
self.root_path_in_servers = root_path_in_servers
|
||||
self.docs_url = docs_url
|
||||
self.redoc_url = redoc_url
|
||||
self.asyncapi_url = asyncapi_url
|
||||
self.asyncapi_docs_url = asyncapi_docs_url
|
||||
self.swagger_ui_oauth2_redirect_url = swagger_ui_oauth2_redirect_url
|
||||
self.swagger_ui_init_oauth = swagger_ui_init_oauth
|
||||
self.swagger_ui_parameters = swagger_ui_parameters
|
||||
|
|
@ -925,9 +972,13 @@ class FastAPI(Starlette):
|
|||
),
|
||||
] = "3.1.0"
|
||||
self.openapi_schema: dict[str, Any] | None = None
|
||||
self.asyncapi_schema: dict[str, Any] | None = None
|
||||
if self.openapi_url:
|
||||
assert self.title, "A title must be provided for OpenAPI, e.g.: 'My API'"
|
||||
assert self.version, "A version must be provided for OpenAPI, e.g.: '2.1.0'"
|
||||
if self.asyncapi_url:
|
||||
assert self.title, "A title must be provided for AsyncAPI, e.g.: 'My API'"
|
||||
assert self.version, "A version must be provided for AsyncAPI, e.g.: '2.1.0'"
|
||||
# TODO: remove when discarding the openapi_prefix parameter
|
||||
if openapi_prefix:
|
||||
logger.warning(
|
||||
|
|
@ -1099,6 +1150,36 @@ class FastAPI(Starlette):
|
|||
)
|
||||
return self.openapi_schema
|
||||
|
||||
def asyncapi(self) -> dict[str, Any]:
|
||||
"""
|
||||
Generate the AsyncAPI schema of the application. This is called by FastAPI
|
||||
internally.
|
||||
|
||||
The first time it is called it stores the result in the attribute
|
||||
`app.asyncapi_schema`, and next times it is called, it just returns that same
|
||||
result. To avoid the cost of generating the schema every time.
|
||||
|
||||
If you need to modify the generated AsyncAPI schema, you could modify it.
|
||||
|
||||
AsyncAPI is used to document WebSocket endpoints, similar to how OpenAPI
|
||||
documents HTTP endpoints.
|
||||
"""
|
||||
if not self.asyncapi_schema:
|
||||
self.asyncapi_schema = get_asyncapi(
|
||||
title=self.title,
|
||||
version=self.version,
|
||||
asyncapi_version="2.6.0",
|
||||
summary=self.summary,
|
||||
description=self.description,
|
||||
routes=self.routes,
|
||||
servers=self.servers,
|
||||
terms_of_service=self.terms_of_service,
|
||||
contact=self.contact,
|
||||
license_info=self.license_info,
|
||||
external_docs=self.openapi_external_docs,
|
||||
)
|
||||
return self.asyncapi_schema
|
||||
|
||||
def setup(self) -> None:
|
||||
if self.openapi_url:
|
||||
|
||||
|
|
@ -1115,6 +1196,21 @@ class FastAPI(Starlette):
|
|||
return JSONResponse(schema)
|
||||
|
||||
self.add_route(self.openapi_url, openapi, include_in_schema=False)
|
||||
if self.asyncapi_url:
|
||||
|
||||
async def asyncapi(req: Request) -> JSONResponse:
|
||||
root_path = req.scope.get("root_path", "").rstrip("/")
|
||||
schema = self.asyncapi()
|
||||
if root_path and self.root_path_in_servers:
|
||||
server_urls = {s.get("url") for s in schema.get("servers", [])}
|
||||
if root_path not in server_urls:
|
||||
schema = dict(schema)
|
||||
schema["servers"] = [{"url": root_path}] + schema.get(
|
||||
"servers", []
|
||||
)
|
||||
return JSONResponse(schema)
|
||||
|
||||
self.add_route(self.asyncapi_url, asyncapi, include_in_schema=False)
|
||||
if self.openapi_url and self.docs_url:
|
||||
|
||||
async def swagger_ui_html(req: Request) -> HTMLResponse:
|
||||
|
|
@ -1123,12 +1219,16 @@ class FastAPI(Starlette):
|
|||
oauth2_redirect_url = self.swagger_ui_oauth2_redirect_url
|
||||
if oauth2_redirect_url:
|
||||
oauth2_redirect_url = root_path + oauth2_redirect_url
|
||||
asyncapi_docs_url = None
|
||||
if self.asyncapi_docs_url:
|
||||
asyncapi_docs_url = root_path + self.asyncapi_docs_url
|
||||
return get_swagger_ui_html(
|
||||
openapi_url=openapi_url,
|
||||
title=f"{self.title} - Swagger UI",
|
||||
oauth2_redirect_url=oauth2_redirect_url,
|
||||
init_oauth=self.swagger_ui_init_oauth,
|
||||
swagger_ui_parameters=self.swagger_ui_parameters,
|
||||
asyncapi_docs_url=asyncapi_docs_url,
|
||||
)
|
||||
|
||||
self.add_route(self.docs_url, swagger_ui_html, include_in_schema=False)
|
||||
|
|
@ -1153,6 +1253,21 @@ class FastAPI(Starlette):
|
|||
)
|
||||
|
||||
self.add_route(self.redoc_url, redoc_html, include_in_schema=False)
|
||||
if self.asyncapi_url and self.asyncapi_docs_url:
|
||||
|
||||
async def asyncapi_ui_html(req: Request) -> HTMLResponse:
|
||||
root_path = req.scope.get("root_path", "").rstrip("/")
|
||||
asyncapi_url = root_path + self.asyncapi_url
|
||||
docs_url = root_path + self.docs_url if self.docs_url else None
|
||||
return get_asyncapi_html(
|
||||
asyncapi_url=asyncapi_url,
|
||||
title=f"{self.title} - AsyncAPI",
|
||||
docs_url=docs_url,
|
||||
)
|
||||
|
||||
self.add_route(
|
||||
self.asyncapi_docs_url, asyncapi_ui_html, include_in_schema=False
|
||||
)
|
||||
|
||||
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
|
||||
if self.root_path:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,106 @@
|
|||
from collections.abc import Sequence
|
||||
from typing import Any
|
||||
|
||||
from fastapi import routing
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from starlette.routing import BaseRoute
|
||||
|
||||
|
||||
def get_asyncapi_channel(
|
||||
*,
|
||||
route: routing.APIWebSocketRoute,
|
||||
) -> dict[str, Any]:
|
||||
"""Generate AsyncAPI channel definition for a WebSocket route."""
|
||||
channel: dict[str, Any] = {}
|
||||
|
||||
# WebSocket channels typically have subscribe operation
|
||||
# (client subscribes to receive messages from server)
|
||||
operation: dict[str, Any] = {
|
||||
"operationId": route.name or f"websocket_{route.path_format}",
|
||||
}
|
||||
|
||||
# Basic message schema - can be enhanced later with actual message types
|
||||
# For WebSockets, messages can be sent in both directions
|
||||
message: dict[str, Any] = {
|
||||
"contentType": "application/json",
|
||||
}
|
||||
|
||||
operation["message"] = message
|
||||
channel["subscribe"] = operation
|
||||
|
||||
# WebSockets are bidirectional, so we also include publish
|
||||
# (client can publish messages to server)
|
||||
publish_operation: dict[str, Any] = {
|
||||
"operationId": f"{route.name or f'websocket_{route.path_format}'}_publish",
|
||||
"message": message,
|
||||
}
|
||||
channel["publish"] = publish_operation
|
||||
|
||||
return channel
|
||||
|
||||
|
||||
def get_asyncapi(
|
||||
*,
|
||||
title: str,
|
||||
version: str,
|
||||
asyncapi_version: str = "2.6.0",
|
||||
summary: str | None = None,
|
||||
description: str | None = None,
|
||||
routes: Sequence[BaseRoute],
|
||||
servers: list[dict[str, str | Any]] | None = None,
|
||||
terms_of_service: str | None = None,
|
||||
contact: dict[str, str | Any] | None = None,
|
||||
license_info: dict[str, str | Any] | None = None,
|
||||
external_docs: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Generate AsyncAPI schema from FastAPI application routes.
|
||||
|
||||
Filters for WebSocket routes and generates AsyncAPI 2.6.0 compliant schema.
|
||||
"""
|
||||
info: dict[str, Any] = {"title": title, "version": version}
|
||||
if summary:
|
||||
info["summary"] = summary
|
||||
if description:
|
||||
info["description"] = description
|
||||
if terms_of_service:
|
||||
info["termsOfService"] = terms_of_service
|
||||
if contact:
|
||||
info["contact"] = contact
|
||||
if license_info:
|
||||
info["license"] = license_info
|
||||
|
||||
output: dict[str, Any] = {"asyncapi": asyncapi_version, "info": info}
|
||||
|
||||
# Add default WebSocket server if no servers provided and we have WebSocket routes
|
||||
websocket_routes = [
|
||||
route for route in routes or [] if isinstance(route, routing.APIWebSocketRoute)
|
||||
]
|
||||
if websocket_routes and not servers:
|
||||
# Default WebSocket server - can be overridden by providing servers parameter
|
||||
output["servers"] = [
|
||||
{
|
||||
"url": "ws://localhost:8000",
|
||||
"protocol": "ws",
|
||||
"description": "WebSocket server",
|
||||
}
|
||||
]
|
||||
elif servers:
|
||||
output["servers"] = servers
|
||||
|
||||
channels: dict[str, dict[str, Any]] = {}
|
||||
|
||||
# Filter routes to only include WebSocket routes
|
||||
for route in routes or []:
|
||||
if isinstance(route, routing.APIWebSocketRoute):
|
||||
channel = get_asyncapi_channel(route=route)
|
||||
if channel:
|
||||
channels[route.path_format] = channel
|
||||
|
||||
output["channels"] = channels
|
||||
|
||||
if external_docs:
|
||||
output["externalDocs"] = external_docs
|
||||
|
||||
return jsonable_encoder(output, by_alias=True, exclude_none=True)
|
||||
|
||||
|
|
@ -133,6 +133,14 @@ def get_swagger_ui_html(
|
|||
"""
|
||||
),
|
||||
] = None,
|
||||
asyncapi_docs_url: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
The URL to the AsyncAPI docs for navigation link.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
) -> HTMLResponse:
|
||||
"""
|
||||
Generate and return the HTML that loads Swagger UI for the interactive
|
||||
|
|
@ -149,6 +157,17 @@ def get_swagger_ui_html(
|
|||
if swagger_ui_parameters:
|
||||
current_swagger_ui_parameters.update(swagger_ui_parameters)
|
||||
|
||||
navigation_html = ""
|
||||
if asyncapi_docs_url:
|
||||
navigation_html = f"""
|
||||
<div style="padding: 10px; background-color: #f5f5f5; border-bottom: 1px solid #ddd;">
|
||||
<span style="color: #666;">REST API Documentation</span>
|
||||
<a href="{asyncapi_docs_url}" style="color: #007bff; text-decoration: none; margin-left: 20px;">
|
||||
🔌 AsyncAPI Docs (WebSocket API)
|
||||
</a>
|
||||
</div>
|
||||
"""
|
||||
|
||||
html = f"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
|
|
@ -159,6 +178,7 @@ def get_swagger_ui_html(
|
|||
<title>{title}</title>
|
||||
</head>
|
||||
<body>
|
||||
{navigation_html}
|
||||
<div id="swagger-ui">
|
||||
</div>
|
||||
<script src="{swagger_js_url}"></script>
|
||||
|
|
@ -194,6 +214,129 @@ def get_swagger_ui_html(
|
|||
return HTMLResponse(html)
|
||||
|
||||
|
||||
def get_asyncapi_html(
|
||||
*,
|
||||
asyncapi_url: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
The AsyncAPI URL that AsyncAPI Studio should load and use.
|
||||
|
||||
This is normally done automatically by FastAPI using the default URL
|
||||
`/asyncapi.json`.
|
||||
|
||||
Read more about it in the
|
||||
[FastAPI docs for AsyncAPI](https://fastapi.tiangolo.com/advanced/asyncapi/).
|
||||
"""
|
||||
),
|
||||
],
|
||||
title: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
The HTML `<title>` content, normally shown in the browser tab.
|
||||
"""
|
||||
),
|
||||
],
|
||||
asyncapi_js_url: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
The URL to use to load the AsyncAPI Studio JavaScript.
|
||||
|
||||
It is normally set to a CDN URL.
|
||||
"""
|
||||
),
|
||||
] = "https://unpkg.com/@asyncapi/react-component@latest/browser/standalone/index.js",
|
||||
asyncapi_favicon_url: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
The URL of the favicon to use. It is normally shown in the browser tab.
|
||||
"""
|
||||
),
|
||||
] = "https://fastapi.tiangolo.com/img/favicon.png",
|
||||
docs_url: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
The URL to the OpenAPI docs (Swagger UI) for navigation link.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
) -> HTMLResponse:
|
||||
"""
|
||||
Generate and return the HTML that loads AsyncAPI Studio for the interactive
|
||||
WebSocket API docs (normally served at `/asyncapi-docs`).
|
||||
|
||||
You would only call this function yourself if you needed to override some parts,
|
||||
for example the URLs to use to load AsyncAPI Studio's JavaScript.
|
||||
"""
|
||||
navigation_html = ""
|
||||
if docs_url:
|
||||
navigation_html = f"""
|
||||
<div style="padding: 10px; background-color: #f5f5f5; border-bottom: 1px solid #ddd;">
|
||||
<a href="{docs_url}" style="color: #007bff; text-decoration: none; margin-right: 20px;">
|
||||
📄 OpenAPI Docs (REST API)
|
||||
</a>
|
||||
<span style="color: #666;">WebSocket API Documentation</span>
|
||||
</div>
|
||||
"""
|
||||
|
||||
html = f"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<link rel="shortcut icon" href="{asyncapi_favicon_url}">
|
||||
<title>{title}</title>
|
||||
<style>
|
||||
body {{
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
}}
|
||||
#asyncapi {{
|
||||
height: 100vh;
|
||||
width: 100%;
|
||||
}}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
{navigation_html}
|
||||
<div id="asyncapi"></div>
|
||||
<script src="{asyncapi_js_url}"></script>
|
||||
<script>
|
||||
(async function() {{
|
||||
const asyncapiSpec = await fetch('{asyncapi_url}').then(res => res.json());
|
||||
const AsyncApiStandalone = window.AsyncApiStandalone || window.AsyncAPIStandalone;
|
||||
if (AsyncApiStandalone) {{
|
||||
AsyncApiStandalone.render({{
|
||||
schema: asyncapiSpec,
|
||||
config: {{
|
||||
show: {{
|
||||
sidebar: true,
|
||||
info: true,
|
||||
servers: true,
|
||||
operations: true,
|
||||
messages: true,
|
||||
}},
|
||||
}},
|
||||
}}, document.getElementById('asyncapi'));
|
||||
}} else {{
|
||||
document.getElementById('asyncapi').innerHTML =
|
||||
'<div style="padding: 20px; text-align: center;">' +
|
||||
'<h2>Failed to load AsyncAPI Studio</h2>' +
|
||||
'<p>Please check your internet connection and try again.</p>' +
|
||||
'</div>';
|
||||
}}
|
||||
}})();
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
return HTMLResponse(html)
|
||||
|
||||
|
||||
def get_redoc_html(
|
||||
*,
|
||||
openapi_url: Annotated[
|
||||
|
|
|
|||
|
|
@ -0,0 +1,262 @@
|
|||
from fastapi import FastAPI, WebSocket
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
def test_asyncapi_schema():
|
||||
"""Test AsyncAPI schema endpoint with WebSocket routes."""
|
||||
app = FastAPI(title="Test API", version="1.0.0")
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
@app.websocket("/ws/{item_id}")
|
||||
async def websocket_with_param(websocket: WebSocket, item_id: str):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/asyncapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
schema = response.json()
|
||||
assert schema["asyncapi"] == "2.6.0"
|
||||
assert schema["info"]["title"] == "Test API"
|
||||
assert schema["info"]["version"] == "1.0.0"
|
||||
assert "channels" in schema
|
||||
assert "/ws" in schema["channels"]
|
||||
assert "/ws/{item_id}" in schema["channels"]
|
||||
|
||||
|
||||
def test_asyncapi_no_websockets():
|
||||
"""Test AsyncAPI schema with no WebSocket routes."""
|
||||
app = FastAPI(title="Test API", version="1.0.0")
|
||||
|
||||
@app.get("/")
|
||||
def read_root():
|
||||
return {"message": "Hello World"}
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/asyncapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
schema = response.json()
|
||||
assert schema["asyncapi"] == "2.6.0"
|
||||
assert schema["info"]["title"] == "Test API"
|
||||
assert schema["channels"] == {}
|
||||
|
||||
|
||||
def test_asyncapi_caching():
|
||||
"""Test that AsyncAPI schema is cached."""
|
||||
app = FastAPI(title="Test API", version="1.0.0")
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
schema1 = app.asyncapi()
|
||||
schema2 = app.asyncapi()
|
||||
# Should return the same object (identity check)
|
||||
assert schema1 is schema2
|
||||
|
||||
|
||||
def test_asyncapi_ui():
|
||||
"""Test AsyncAPI UI endpoint."""
|
||||
app = FastAPI(title="Test API", version="1.0.0")
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/asyncapi-docs")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.headers["content-type"] == "text/html; charset=utf-8"
|
||||
assert "@asyncapi/react-component" in response.text
|
||||
assert "/asyncapi.json" in response.text
|
||||
|
||||
|
||||
def test_asyncapi_ui_navigation():
|
||||
"""Test navigation links in AsyncAPI UI."""
|
||||
app = FastAPI(title="Test API", version="1.0.0")
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/asyncapi-docs")
|
||||
assert response.status_code == 200, response.text
|
||||
# Should contain link to OpenAPI docs
|
||||
assert "/docs" in response.text
|
||||
assert "OpenAPI Docs" in response.text
|
||||
|
||||
|
||||
def test_swagger_ui_asyncapi_navigation():
|
||||
"""Test navigation link to AsyncAPI in Swagger UI."""
|
||||
app = FastAPI(title="Test API", version="1.0.0")
|
||||
|
||||
@app.get("/")
|
||||
def read_root():
|
||||
return {"message": "Hello World"}
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/docs")
|
||||
assert response.status_code == 200, response.text
|
||||
# Should contain link to AsyncAPI docs
|
||||
assert "/asyncapi-docs" in response.text
|
||||
assert "AsyncAPI Docs" in response.text
|
||||
|
||||
|
||||
def test_asyncapi_custom_urls():
|
||||
"""Test custom AsyncAPI URLs."""
|
||||
app = FastAPI(
|
||||
title="Test API",
|
||||
version="1.0.0",
|
||||
asyncapi_url="/custom/asyncapi.json",
|
||||
asyncapi_docs_url="/custom/asyncapi-docs",
|
||||
)
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
client = TestClient(app)
|
||||
# Test custom JSON endpoint
|
||||
response = client.get("/custom/asyncapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
schema = response.json()
|
||||
assert schema["asyncapi"] == "2.6.0"
|
||||
|
||||
# Test custom UI endpoint
|
||||
response = client.get("/custom/asyncapi-docs")
|
||||
assert response.status_code == 200, response.text
|
||||
assert "/custom/asyncapi.json" in response.text
|
||||
|
||||
# Default endpoints should not exist
|
||||
response = client.get("/asyncapi.json")
|
||||
assert response.status_code == 404
|
||||
response = client.get("/asyncapi-docs")
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_asyncapi_disabled():
|
||||
"""Test when AsyncAPI is disabled."""
|
||||
app = FastAPI(
|
||||
title="Test API",
|
||||
version="1.0.0",
|
||||
asyncapi_url=None,
|
||||
)
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
client = TestClient(app)
|
||||
# Endpoints should return 404
|
||||
response = client.get("/asyncapi.json")
|
||||
assert response.status_code == 404
|
||||
response = client.get("/asyncapi-docs")
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_asyncapi_channel_structure():
|
||||
"""Test AsyncAPI channel structure."""
|
||||
app = FastAPI(title="Test API", version="1.0.0")
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/asyncapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
schema = response.json()
|
||||
channel = schema["channels"]["/ws"]
|
||||
assert "subscribe" in channel
|
||||
assert "operationId" in channel["subscribe"]
|
||||
assert "message" in channel["subscribe"]
|
||||
|
||||
|
||||
def test_asyncapi_multiple_websockets():
|
||||
"""Test AsyncAPI with multiple WebSocket routes."""
|
||||
app = FastAPI(title="Test API", version="1.0.0")
|
||||
|
||||
@app.websocket("/ws1")
|
||||
async def websocket1(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
@app.websocket("/ws2")
|
||||
async def websocket2(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
@app.websocket("/ws3/{param}")
|
||||
async def websocket3(websocket: WebSocket, param: str):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/asyncapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
schema = response.json()
|
||||
assert len(schema["channels"]) == 3
|
||||
assert "/ws1" in schema["channels"]
|
||||
assert "/ws2" in schema["channels"]
|
||||
assert "/ws3/{param}" in schema["channels"]
|
||||
|
||||
|
||||
def test_asyncapi_with_metadata():
|
||||
"""Test AsyncAPI schema includes app metadata."""
|
||||
app = FastAPI(
|
||||
title="My API",
|
||||
version="2.0.0",
|
||||
summary="Test summary",
|
||||
description="Test description",
|
||||
)
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/asyncapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
schema = response.json()
|
||||
assert schema["info"]["title"] == "My API"
|
||||
assert schema["info"]["version"] == "2.0.0"
|
||||
assert schema["info"]["summary"] == "Test summary"
|
||||
assert schema["info"]["description"] == "Test description"
|
||||
|
||||
|
||||
def test_asyncapi_ui_no_docs_url():
|
||||
"""Test AsyncAPI UI when docs_url is None."""
|
||||
app = FastAPI(
|
||||
title="Test API",
|
||||
version="1.0.0",
|
||||
docs_url=None,
|
||||
)
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
await websocket.close()
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/asyncapi-docs")
|
||||
assert response.status_code == 200, response.text
|
||||
# Should not contain link to /docs if docs_url is None
|
||||
# But navigation should still work (just won't show the link)
|
||||
assert "/asyncapi.json" in response.text
|
||||
Loading…
Reference in New Issue