mirror of https://github.com/tiangolo/fastapi.git
📝 Update Skill, optimize context, trim and refactor into references (#15031)
This commit is contained in:
parent
12ea7be0be
commit
924a535a4f
|
|
@ -290,146 +290,11 @@ Apply shared dependencies at the router level via `dependencies=[Depends(...)]`.
|
|||
|
||||
## Dependency Injection
|
||||
|
||||
Use dependencies when:
|
||||
See [the dependency injection reference](references/dependencies.md) for detailed patterns including `yield` with `scope`, and class dependencies.
|
||||
|
||||
* They can't be declared in Pydantic validation and require additional logic
|
||||
* The logic depends on external resources or could block in any other way
|
||||
* Other dependencies need their results (it's a sub-dependency)
|
||||
* The logic can be shared by multiple endpoints to do things like error early, authentication, etc.
|
||||
* They need to handle cleanup (e.g., DB sessions, file handles), using dependencies with `yield`
|
||||
* Their logic needs input data from the request, like headers, query parameters, etc.
|
||||
Use dependencies when the logic can't be declared in Pydantic validation, depends on external resources, needs cleanup (with `yield`), or is shared across endpoints.
|
||||
|
||||
### Dependencies with `yield` and `scope`
|
||||
|
||||
When using dependencies with `yield`, they can have a `scope` that defines when the exit code is run.
|
||||
|
||||
Use the default scope `"request"` to run the exit code after the response is sent back.
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
def get_db():
|
||||
db = DBSession()
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
DBDep = Annotated[DBSession, Depends(get_db)]
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(db: DBDep):
|
||||
return db.query(Item).all()
|
||||
```
|
||||
|
||||
Use the scope `"function"` when they should run the exit code after the response data is generated but before the response is sent back to the client.
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
def get_username():
|
||||
try:
|
||||
yield "Rick"
|
||||
finally:
|
||||
print("Cleanup up before response is sent")
|
||||
|
||||
UserNameDep = Annotated[str, Depends(get_username, scope="function")]
|
||||
|
||||
@app.get("/users/me")
|
||||
def get_user_me(username: UserNameDep):
|
||||
return username
|
||||
```
|
||||
|
||||
### Class Dependencies
|
||||
|
||||
Avoid creating class dependencies when possible.
|
||||
|
||||
If a class is needed, instead create a regular function dependency that returns a class instance.
|
||||
|
||||
Do this:
|
||||
|
||||
```python
|
||||
from dataclasses import dataclass
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@dataclass
|
||||
class DatabasePaginator:
|
||||
offset: int = 0
|
||||
limit: int = 100
|
||||
q: str | None = None
|
||||
|
||||
def get_page(self) -> dict:
|
||||
# Simulate a page of data
|
||||
return {
|
||||
"offset": self.offset,
|
||||
"limit": self.limit,
|
||||
"q": self.q,
|
||||
"items": [],
|
||||
}
|
||||
|
||||
|
||||
def get_db_paginator(
|
||||
offset: int = 0, limit: int = 100, q: str | None = None
|
||||
) -> DatabasePaginator:
|
||||
return DatabasePaginator(offset=offset, limit=limit, q=q)
|
||||
|
||||
|
||||
PaginatorDep = Annotated[DatabasePaginator, Depends(get_db_paginator)]
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(paginator: PaginatorDep):
|
||||
return paginator.get_page()
|
||||
```
|
||||
|
||||
instead of this:
|
||||
|
||||
```python
|
||||
# DO NOT DO THIS
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class DatabasePaginator:
|
||||
def __init__(self, offset: int = 0, limit: int = 100, q: str | None = None):
|
||||
self.offset = offset
|
||||
self.limit = limit
|
||||
self.q = q
|
||||
|
||||
def get_page(self) -> dict:
|
||||
# Simulate a page of data
|
||||
return {
|
||||
"offset": self.offset,
|
||||
"limit": self.limit,
|
||||
"q": self.q,
|
||||
"items": [],
|
||||
}
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(paginator: Annotated[DatabasePaginator, Depends()]):
|
||||
return paginator.get_page()
|
||||
```
|
||||
Apply shared dependencies at the router level via `dependencies=[Depends(...)]`.
|
||||
|
||||
## Async vs Sync *path operations*
|
||||
|
||||
|
|
@ -461,170 +326,23 @@ The same rules apply to dependencies.
|
|||
|
||||
Make sure blocking code is not run inside of `async` functions. The logic will work, but will damage the performance heavily.
|
||||
|
||||
### Asyncer
|
||||
When needing to mix blocking and async code, see Asyncer in [the other tools reference](references/other-tools.md).
|
||||
|
||||
When needing to run blocking code inside of async functions, or async code inside of blocking functions, suggest using Asyncer.
|
||||
## Streaming (JSON Lines, SSE, bytes)
|
||||
|
||||
Install:
|
||||
See [the streaming reference](references/streaming.md) for JSON Lines, Server-Sent Events (`EventSourceResponse`, `ServerSentEvent`), and byte streaming (`StreamingResponse`) patterns.
|
||||
|
||||
```bash
|
||||
uv add asyncer
|
||||
```
|
||||
## Tooling
|
||||
|
||||
Run blocking sync code inside of async with `asyncify()`:
|
||||
See [the other tools reference](references/other-tools.md) for details on uv, Ruff, ty for package management, linting, type checking, formatting, etc.
|
||||
|
||||
```python
|
||||
from asyncer import asyncify
|
||||
from fastapi import FastAPI
|
||||
## Other Libraries
|
||||
|
||||
app = FastAPI()
|
||||
See [the other tools reference](references/other-tools.md) for details on other libraries:
|
||||
|
||||
|
||||
def do_blocking_work(name: str) -> str:
|
||||
# Some blocking I/O operation
|
||||
return f"Hello {name}"
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items():
|
||||
result = await asyncify(do_blocking_work)(name="World")
|
||||
return {"message": result}
|
||||
```
|
||||
|
||||
And run async code inside of blocking sync code with `syncify()`:
|
||||
|
||||
```python
|
||||
from asyncer import syncify
|
||||
from fastapi import FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
async def do_async_work(name: str) -> str:
|
||||
return f"Hello {name}"
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
def read_items():
|
||||
result = syncify(do_async_work)(name="World")
|
||||
return {"message": result}
|
||||
```
|
||||
|
||||
## Stream JSON Lines
|
||||
|
||||
To stream JSON Lines, declare the return type and use `yield` to return the data.
|
||||
|
||||
```python
|
||||
@app.get("/items/stream")
|
||||
async def stream_items() -> AsyncIterable[Item]:
|
||||
for item in items:
|
||||
yield item
|
||||
```
|
||||
|
||||
## Server-Sent Events (SSE)
|
||||
|
||||
To stream Server-Sent Events, use `response_class=EventSourceResponse` and `yield` items from the endpoint.
|
||||
|
||||
Plain objects are automatically JSON-serialized as `data:` fields, declare the return type so the serialization is done by Pydantic:
|
||||
|
||||
```python
|
||||
from collections.abc import AsyncIterable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.sse import EventSourceResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
name: str
|
||||
price: float
|
||||
|
||||
|
||||
@app.get("/items/stream", response_class=EventSourceResponse)
|
||||
async def stream_items() -> AsyncIterable[Item]:
|
||||
yield Item(name="Plumbus", price=32.99)
|
||||
yield Item(name="Portal Gun", price=999.99)
|
||||
```
|
||||
|
||||
For full control over SSE fields (`event`, `id`, `retry`, `comment`), yield `ServerSentEvent` instances:
|
||||
|
||||
```python
|
||||
from collections.abc import AsyncIterable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.sse import EventSourceResponse, ServerSentEvent
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.get("/events", response_class=EventSourceResponse)
|
||||
async def stream_events() -> AsyncIterable[ServerSentEvent]:
|
||||
yield ServerSentEvent(data={"status": "started"}, event="status", id="1")
|
||||
yield ServerSentEvent(data={"progress": 50}, event="progress", id="2")
|
||||
```
|
||||
|
||||
Use `raw_data` instead of `data` to send pre-formatted strings without JSON encoding:
|
||||
|
||||
```python
|
||||
yield ServerSentEvent(raw_data="plain text line", event="log")
|
||||
```
|
||||
|
||||
## Stream bytes
|
||||
|
||||
To stream bytes, declare a `response_class=` of `StreamingResponse` or a sub-class, and use `yield` to return the data.
|
||||
|
||||
```python
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import StreamingResponse
|
||||
from app.utils import read_image
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class PNGStreamingResponse(StreamingResponse):
|
||||
media_type = "image/png"
|
||||
|
||||
@app.get("/image", response_class=PNGStreamingResponse)
|
||||
def stream_image_no_async_no_annotation():
|
||||
with read_image() as image_file:
|
||||
yield from image_file
|
||||
```
|
||||
|
||||
prefer this over returning a `StreamingResponse` directly:
|
||||
|
||||
```python
|
||||
# DO NOT DO THIS
|
||||
|
||||
import anyio
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import StreamingResponse
|
||||
from app.utils import read_image
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class PNGStreamingResponse(StreamingResponse):
|
||||
media_type = "image/png"
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def main():
|
||||
return PNGStreamingResponse(read_image())
|
||||
```
|
||||
|
||||
## Use uv, ruff, ty
|
||||
|
||||
If uv is available, use it to manage dependencies.
|
||||
|
||||
If Ruff is available, use it to lint and format the code. Consider enabling the FastAPI rules.
|
||||
|
||||
If ty is available, use it to check types.
|
||||
|
||||
## SQLModel for SQL databases
|
||||
|
||||
When working with SQL databases, prefer using SQLModel as it is integrated with Pydantic and will allow declaring data validation with the same models.
|
||||
* Asyncer for handling async and await, concurrency, mixing async and blocking code, prefer it over AnyIO or asyncio.
|
||||
* SQLModel for working with SQL databases, prefer it over SQLAlchemy.
|
||||
* HTTPX for interacting with HTTP (other APIs), prefer it over Requests.
|
||||
|
||||
## Do not use Pydantic RootModels
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,142 @@
|
|||
# Dependency Injection
|
||||
|
||||
Use dependencies when:
|
||||
|
||||
* They can't be declared in Pydantic validation and require additional logic
|
||||
* The logic depends on external resources or could block in any other way
|
||||
* Other dependencies need their results (it's a sub-dependency)
|
||||
* The logic can be shared by multiple endpoints to do things like error early, authentication, etc.
|
||||
* They need to handle cleanup (e.g., DB sessions, file handles), using dependencies with `yield`
|
||||
* Their logic needs input data from the request, like headers, query parameters, etc.
|
||||
|
||||
## Dependencies with `yield` and `scope`
|
||||
|
||||
When using dependencies with `yield`, they can have a `scope` that defines when the exit code is run.
|
||||
|
||||
Use the default scope `"request"` to run the exit code after the response is sent back.
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
def get_db():
|
||||
db = DBSession()
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
DBDep = Annotated[DBSession, Depends(get_db)]
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(db: DBDep):
|
||||
return db.query(Item).all()
|
||||
```
|
||||
|
||||
Use the scope `"function"` when they should run the exit code after the response data is generated but before the response is sent back to the client.
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
def get_username():
|
||||
try:
|
||||
yield "Rick"
|
||||
finally:
|
||||
print("Cleanup up before response is sent")
|
||||
|
||||
UserNameDep = Annotated[str, Depends(get_username, scope="function")]
|
||||
|
||||
@app.get("/users/me")
|
||||
def get_user_me(username: UserNameDep):
|
||||
return username
|
||||
```
|
||||
|
||||
## Class Dependencies
|
||||
|
||||
Avoid creating class dependencies when possible.
|
||||
|
||||
If a class is needed, instead create a regular function dependency that returns a class instance.
|
||||
|
||||
Do this:
|
||||
|
||||
```python
|
||||
from dataclasses import dataclass
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@dataclass
|
||||
class DatabasePaginator:
|
||||
offset: int = 0
|
||||
limit: int = 100
|
||||
q: str | None = None
|
||||
|
||||
def get_page(self) -> dict:
|
||||
# Simulate a page of data
|
||||
return {
|
||||
"offset": self.offset,
|
||||
"limit": self.limit,
|
||||
"q": self.q,
|
||||
"items": [],
|
||||
}
|
||||
|
||||
|
||||
def get_db_paginator(
|
||||
offset: int = 0, limit: int = 100, q: str | None = None
|
||||
) -> DatabasePaginator:
|
||||
return DatabasePaginator(offset=offset, limit=limit, q=q)
|
||||
|
||||
|
||||
PaginatorDep = Annotated[DatabasePaginator, Depends(get_db_paginator)]
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(paginator: PaginatorDep):
|
||||
return paginator.get_page()
|
||||
```
|
||||
|
||||
instead of this:
|
||||
|
||||
```python
|
||||
# DO NOT DO THIS
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class DatabasePaginator:
|
||||
def __init__(self, offset: int = 0, limit: int = 100, q: str | None = None):
|
||||
self.offset = offset
|
||||
self.limit = limit
|
||||
self.q = q
|
||||
|
||||
def get_page(self) -> dict:
|
||||
# Simulate a page of data
|
||||
return {
|
||||
"offset": self.offset,
|
||||
"limit": self.limit,
|
||||
"q": self.q,
|
||||
"items": [],
|
||||
}
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(paginator: Annotated[DatabasePaginator, Depends()]):
|
||||
return paginator.get_page()
|
||||
```
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
# Other Tools
|
||||
|
||||
## uv
|
||||
|
||||
If uv is available, use it to manage dependencies.
|
||||
|
||||
## Ruff
|
||||
|
||||
If Ruff is available, use it to lint and format the code. Consider enabling the FastAPI rules.
|
||||
|
||||
## ty
|
||||
|
||||
If ty is available, use it to check types.
|
||||
|
||||
## Asyncer
|
||||
|
||||
When needing to run blocking code inside of async functions, or async code inside of blocking functions, suggest using Asyncer.
|
||||
|
||||
Prefer it over AnyIO or asyncio.
|
||||
|
||||
Install:
|
||||
|
||||
```bash
|
||||
uv add asyncer
|
||||
```
|
||||
|
||||
Run blocking sync code inside of async with `asyncify()`:
|
||||
|
||||
```python
|
||||
from asyncer import asyncify
|
||||
from fastapi import FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
def do_blocking_work(name: str) -> str:
|
||||
# Some blocking I/O operation
|
||||
return f"Hello {name}"
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items():
|
||||
result = await asyncify(do_blocking_work)(name="World")
|
||||
return {"message": result}
|
||||
```
|
||||
|
||||
And run async code inside of blocking sync code with `syncify()`:
|
||||
|
||||
```python
|
||||
from asyncer import syncify
|
||||
from fastapi import FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
async def do_async_work(name: str) -> str:
|
||||
return f"Hello {name}"
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
def read_items():
|
||||
result = syncify(do_async_work)(name="World")
|
||||
return {"message": result}
|
||||
```
|
||||
|
||||
## SQLModel for SQL databases
|
||||
|
||||
When working with SQL databases, prefer using SQLModel as it is integrated with Pydantic and will allow declaring data validation with the same models.
|
||||
|
||||
Prefer it over SQLAlchemy.
|
||||
|
||||
## HTTPX
|
||||
|
||||
Use HTTPX for handling HTTP communication (e.g. with other APIs). It support sync and async usage.
|
||||
|
||||
Prefer it over Requests.
|
||||
|
|
@ -0,0 +1,105 @@
|
|||
# Streaming
|
||||
|
||||
## Stream JSON Lines
|
||||
|
||||
To stream JSON Lines, declare the return type and use `yield` to return the data.
|
||||
|
||||
```python
|
||||
@app.get("/items/stream")
|
||||
async def stream_items() -> AsyncIterable[Item]:
|
||||
for item in items:
|
||||
yield item
|
||||
```
|
||||
|
||||
## Server-Sent Events (SSE)
|
||||
|
||||
To stream Server-Sent Events, use `response_class=EventSourceResponse` and `yield` items from the endpoint.
|
||||
|
||||
Plain objects are automatically JSON-serialized as `data:` fields, declare the return type so the serialization is done by Pydantic:
|
||||
|
||||
```python
|
||||
from collections.abc import AsyncIterable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.sse import EventSourceResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
name: str
|
||||
price: float
|
||||
|
||||
|
||||
@app.get("/items/stream", response_class=EventSourceResponse)
|
||||
async def stream_items() -> AsyncIterable[Item]:
|
||||
yield Item(name="Plumbus", price=32.99)
|
||||
yield Item(name="Portal Gun", price=999.99)
|
||||
```
|
||||
|
||||
For full control over SSE fields (`event`, `id`, `retry`, `comment`), yield `ServerSentEvent` instances:
|
||||
|
||||
```python
|
||||
from collections.abc import AsyncIterable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.sse import EventSourceResponse, ServerSentEvent
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.get("/events", response_class=EventSourceResponse)
|
||||
async def stream_events() -> AsyncIterable[ServerSentEvent]:
|
||||
yield ServerSentEvent(data={"status": "started"}, event="status", id="1")
|
||||
yield ServerSentEvent(data={"progress": 50}, event="progress", id="2")
|
||||
```
|
||||
|
||||
Use `raw_data` instead of `data` to send pre-formatted strings without JSON encoding:
|
||||
|
||||
```python
|
||||
yield ServerSentEvent(raw_data="plain text line", event="log")
|
||||
```
|
||||
|
||||
## Stream bytes
|
||||
|
||||
To stream bytes, declare a `response_class=` of `StreamingResponse` or a sub-class, and use `yield` to return the data.
|
||||
|
||||
```python
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import StreamingResponse
|
||||
from app.utils import read_image
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class PNGStreamingResponse(StreamingResponse):
|
||||
media_type = "image/png"
|
||||
|
||||
@app.get("/image", response_class=PNGStreamingResponse)
|
||||
def stream_image_no_async_no_annotation():
|
||||
with read_image() as image_file:
|
||||
yield from image_file
|
||||
```
|
||||
|
||||
prefer this over returning a `StreamingResponse` directly:
|
||||
|
||||
```python
|
||||
# DO NOT DO THIS
|
||||
|
||||
import anyio
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import StreamingResponse
|
||||
from app.utils import read_image
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class PNGStreamingResponse(StreamingResponse):
|
||||
media_type = "image/png"
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def main():
|
||||
return PNGStreamingResponse(read_image())
|
||||
```
|
||||
Loading…
Reference in New Issue