Add variants for `custom_request_and_route/tutorial001`

This commit is contained in:
Yurii Motov 2025-11-27 16:58:37 +01:00
parent a35fc493c5
commit 6e902ac8b9
7 changed files with 201 additions and 9 deletions

View File

@ -42,7 +42,7 @@ If there's no `gzip` in the header, it will not try to decompress the body.
That way, the same route class can handle gzip compressed or uncompressed requests. That way, the same route class can handle gzip compressed or uncompressed requests.
{* ../../docs_src/custom_request_and_route/tutorial001.py hl[8:15] *} {* ../../docs_src/custom_request_and_route/tutorial001_an_py310.py hl[9:16] *}
### Create a custom `GzipRoute` class { #create-a-custom-gziproute-class } ### Create a custom `GzipRoute` class { #create-a-custom-gziproute-class }
@ -54,7 +54,7 @@ This method returns a function. And that function is what will receive a request
Here we use it to create a `GzipRequest` from the original request. Here we use it to create a `GzipRequest` from the original request.
{* ../../docs_src/custom_request_and_route/tutorial001.py hl[18:26] *} {* ../../docs_src/custom_request_and_route/tutorial001_an_py310.py hl[19:27] *}
/// note | Technical Details /// note | Technical Details

View File

@ -0,0 +1,36 @@
import gzip
from typing import Callable, List
from fastapi import Body, FastAPI, Request, Response
from fastapi.routing import APIRoute
from typing_extensions import Annotated
class GzipRequest(Request):
async def body(self) -> bytes:
if not hasattr(self, "_body"):
body = await super().body()
if "gzip" in self.headers.getlist("Content-Encoding"):
body = gzip.decompress(body)
self._body = body
return self._body
class GzipRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
request = GzipRequest(request.scope, request.receive)
return await original_route_handler(request)
return custom_route_handler
app = FastAPI()
app.router.route_class = GzipRoute
@app.post("/sum")
async def sum_numbers(numbers: Annotated[List[int], Body()]):
return {"sum": sum(numbers)}

View File

@ -0,0 +1,36 @@
import gzip
from collections.abc import Callable
from typing import Annotated
from fastapi import Body, FastAPI, Request, Response
from fastapi.routing import APIRoute
class GzipRequest(Request):
async def body(self) -> bytes:
if not hasattr(self, "_body"):
body = await super().body()
if "gzip" in self.headers.getlist("Content-Encoding"):
body = gzip.decompress(body)
self._body = body
return self._body
class GzipRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
request = GzipRequest(request.scope, request.receive)
return await original_route_handler(request)
return custom_route_handler
app = FastAPI()
app.router.route_class = GzipRoute
@app.post("/sum")
async def sum_numbers(numbers: Annotated[list[int], Body()]):
return {"sum": sum(numbers)}

View File

@ -0,0 +1,35 @@
import gzip
from typing import Annotated, Callable
from fastapi import Body, FastAPI, Request, Response
from fastapi.routing import APIRoute
class GzipRequest(Request):
async def body(self) -> bytes:
if not hasattr(self, "_body"):
body = await super().body()
if "gzip" in self.headers.getlist("Content-Encoding"):
body = gzip.decompress(body)
self._body = body
return self._body
class GzipRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
request = GzipRequest(request.scope, request.receive)
return await original_route_handler(request)
return custom_route_handler
app = FastAPI()
app.router.route_class = GzipRoute
@app.post("/sum")
async def sum_numbers(numbers: Annotated[list[int], Body()]):
return {"sum": sum(numbers)}

View File

@ -0,0 +1,35 @@
import gzip
from collections.abc import Callable
from fastapi import Body, FastAPI, Request, Response
from fastapi.routing import APIRoute
class GzipRequest(Request):
async def body(self) -> bytes:
if not hasattr(self, "_body"):
body = await super().body()
if "gzip" in self.headers.getlist("Content-Encoding"):
body = gzip.decompress(body)
self._body = body
return self._body
class GzipRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
request = GzipRequest(request.scope, request.receive)
return await original_route_handler(request)
return custom_route_handler
app = FastAPI()
app.router.route_class = GzipRoute
@app.post("/sum")
async def sum_numbers(numbers: list[int] = Body()):
return {"sum": sum(numbers)}

View File

@ -0,0 +1,35 @@
import gzip
from typing import Callable
from fastapi import Body, FastAPI, Request, Response
from fastapi.routing import APIRoute
class GzipRequest(Request):
async def body(self) -> bytes:
if not hasattr(self, "_body"):
body = await super().body()
if "gzip" in self.headers.getlist("Content-Encoding"):
body = gzip.decompress(body)
self._body = body
return self._body
class GzipRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
request = GzipRequest(request.scope, request.receive)
return await original_route_handler(request)
return custom_route_handler
app = FastAPI()
app.router.route_class = GzipRoute
@app.post("/sum")
async def sum_numbers(numbers: list[int] = Body()):
return {"sum": sum(numbers)}

View File

@ -1,23 +1,38 @@
import gzip import gzip
import importlib
import json import json
import pytest import pytest
from fastapi import Request from fastapi import Request
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from docs_src.custom_request_and_route.tutorial001 import app from tests.utils import needs_py39, needs_py310
@app.get("/check-class") @pytest.fixture(
async def check_gzip_request(request: Request): name="client",
return {"request_class": type(request).__name__} params=[
pytest.param("tutorial001"),
pytest.param("tutorial001_py39", marks=needs_py39),
pytest.param("tutorial001_py310", marks=needs_py310),
pytest.param("tutorial001_an"),
pytest.param("tutorial001_an_py39", marks=needs_py39),
pytest.param("tutorial001_an_py310", marks=needs_py310),
],
)
def get_client(request: pytest.FixtureRequest):
mod = importlib.import_module(f"docs_src.custom_request_and_route.{request.param}")
@mod.app.get("/check-class")
async def check_gzip_request(request: Request):
return {"request_class": type(request).__name__}
client = TestClient(app) client = TestClient(mod.app)
return client
@pytest.mark.parametrize("compress", [True, False]) @pytest.mark.parametrize("compress", [True, False])
def test_gzip_request(compress): def test_gzip_request(client: TestClient, compress):
n = 1000 n = 1000
headers = {} headers = {}
body = [1] * n body = [1] * n
@ -30,6 +45,6 @@ def test_gzip_request(compress):
assert response.json() == {"sum": n} assert response.json() == {"sum": n}
def test_request_class(): def test_request_class(client: TestClient):
response = client.get("/check-class") response = client.get("/check-class")
assert response.json() == {"request_class": "GzipRequest"} assert response.json() == {"request_class": "GzipRequest"}