Refactor code for improved readability and update tests to handle Pydantic v1 and v2 differences.

This commit is contained in:
Arif Dogan 2025-09-18 23:40:25 +02:00
parent 6b907a57f8
commit 93e98d5cb7
No known key found for this signature in database
GPG Key ID: A2131177E83422CE
3 changed files with 45 additions and 45 deletions

View File

@ -85,8 +85,7 @@ def _prepare_response_content(
exclude_none: bool = False,
) -> Any:
if isinstance(res, BaseModel):
read_with_orm_mode = getattr(
_get_model_config(res), "read_with_orm_mode", None)
read_with_orm_mode = getattr(_get_model_config(res), "read_with_orm_mode", None)
if read_with_orm_mode:
# Let from_orm extract the data from this model instead of converting
# it now to a dict.
@ -165,8 +164,7 @@ async def serialize_response(
exclude_none=exclude_none,
)
if is_coroutine:
value, errors_ = field.validate(
response_content, {}, loc=("response",))
value, errors_ = field.validate(response_content, {}, loc=("response",))
else:
value, errors_ = await run_in_threadpool(
field.validate, response_content, {}, loc=("response",)
@ -221,8 +219,7 @@ def get_request_handler(
dependant: Dependant,
body_field: Optional[ModelField] = None,
status_code: Optional[int] = None,
response_class: Union[Type[Response],
DefaultPlaceholder] = Default(JSONResponse),
response_class: Union[Type[Response], DefaultPlaceholder] = Default(JSONResponse),
response_field: Optional[ModelField] = None,
response_model_include: Optional[IncEx] = None,
response_model_exclude: Optional[IncEx] = None,
@ -235,8 +232,7 @@ def get_request_handler(
) -> Callable[[Request], Coroutine[Any, Any, Response]]:
assert dependant.call is not None, "dependant.call must be a function"
is_coroutine = asyncio.iscoroutinefunction(dependant.call)
is_body_form = body_field and isinstance(
body_field.field_info, params.Form)
is_body_form = body_field and isinstance(body_field.field_info, params.Form)
if isinstance(response_class, DefaultPlaceholder):
actual_response_class: Type[Response] = response_class.value
else:
@ -255,8 +251,7 @@ def get_request_handler(
body_bytes = await request.body()
if body_bytes:
json_body: Any = Undefined
content_type_value = request.headers.get(
"content-type")
content_type_value = request.headers.get("content-type")
if not content_type_value:
json_body = await request.json()
else:
@ -273,8 +268,7 @@ def get_request_handler(
except json.JSONDecodeError as e:
lines_before = e.doc[: e.pos].split("\n")
line_number = len(lines_before)
column_number = len(
lines_before[-1]) + 1 if lines_before else 1
column_number = len(lines_before[-1]) + 1 if lines_before else 1
start_pos = max(0, e.pos - 40)
end_pos = min(len(e.doc), e.pos + 40)
@ -359,12 +353,10 @@ def get_request_handler(
exclude_none=response_model_exclude_none,
is_coroutine=is_coroutine,
)
response = actual_response_class(
content, **response_args)
response = actual_response_class(content, **response_args)
if not is_body_allowed_for_status_code(response.status_code):
response.body = b""
response.headers.raw.extend(
solved_result.response.headers.raw)
response.headers.raw.extend(solved_result.response.headers.raw)
if errors:
validation_error = RequestValidationError(
_normalize_errors(errors), body=body
@ -425,15 +417,12 @@ class APIWebSocketRoute(routing.WebSocketRoute):
self.endpoint = endpoint
self.name = get_name(endpoint) if name is None else name
self.dependencies = list(dependencies or [])
self.path_regex, self.path_format, self.param_convertors = compile_path(
path)
self.dependant = get_dependant(
path=self.path_format, call=self.endpoint)
self.path_regex, self.path_format, self.param_convertors = compile_path(path)
self.dependant = get_dependant(path=self.path_format, call=self.endpoint)
for depends in self.dependencies[::-1]:
self.dependant.dependencies.insert(
0,
get_parameterless_sub_dependant(
depends=depends, path=self.path_format),
get_parameterless_sub_dependant(depends=depends, path=self.path_format),
)
self._flat_dependant = get_flat_dependant(self.dependant)
self._embed_body_fields = _should_embed_body_fields(
@ -517,8 +506,7 @@ class APIRoute(routing.Route):
self.tags = tags or []
self.responses = responses or {}
self.name = get_name(endpoint) if name is None else name
self.path_regex, self.path_format, self.param_convertors = compile_path(
path)
self.path_regex, self.path_format, self.param_convertors = compile_path(path)
if methods is None:
methods = ["GET"]
self.methods: Set[str] = {method.upper() for method in methods}
@ -558,15 +546,13 @@ class APIRoute(routing.Route):
self.response_field = None # type: ignore
self.secure_cloned_response_field = None
self.dependencies = list(dependencies or [])
self.description = description or inspect.cleandoc(
self.endpoint.__doc__ or "")
self.description = description or inspect.cleandoc(self.endpoint.__doc__ or "")
# if a "form feed" character (page break) is found in the description text,
# truncate description text to the content preceding the first "form feed"
self.description = self.description.split("\f")[0].strip()
response_fields = {}
for additional_status_code, response in self.responses.items():
assert isinstance(
response, dict), "An additional response must be a dict"
assert isinstance(response, dict), "An additional response must be a dict"
model = response.get("model")
if model:
assert is_body_allowed_for_status_code(additional_status_code), (
@ -578,19 +564,16 @@ class APIRoute(routing.Route):
)
response_fields[additional_status_code] = response_field
if response_fields:
self.response_fields: Dict[Union[int,
str], ModelField] = response_fields
self.response_fields: Dict[Union[int, str], ModelField] = response_fields
else:
self.response_fields = {}
assert callable(endpoint), "An endpoint must be a callable"
self.dependant = get_dependant(
path=self.path_format, call=self.endpoint)
self.dependant = get_dependant(path=self.path_format, call=self.endpoint)
for depends in self.dependencies[::-1]:
self.dependant.dependencies.insert(
0,
get_parameterless_sub_dependant(
depends=depends, path=self.path_format),
get_parameterless_sub_dependant(depends=depends, path=self.path_format),
)
self._flat_dependant = get_flat_dependant(self.dependant)
self._embed_body_fields = _should_embed_body_fields(
@ -657,8 +640,7 @@ class APIRouter(routing.Router):
def __init__(
self,
*,
prefix: Annotated[str, Doc(
"An optional path prefix for the router.")] = "",
prefix: Annotated[str, Doc("An optional path prefix for the router.")] = "",
tags: Annotated[
Optional[List[Union[str, Enum]]],
Doc(
@ -1159,8 +1141,7 @@ class APIRouter(routing.Router):
self,
router: Annotated["APIRouter", Doc("The `APIRouter` to include.")],
*,
prefix: Annotated[str, Doc(
"An optional path prefix for the router.")] = "",
prefix: Annotated[str, Doc("An optional path prefix for the router.")] = "",
tags: Annotated[
Optional[List[Union[str, Enum]]],
Doc(

View File

@ -1,3 +1,4 @@
from dirty_equals import IsDict
from fastapi import FastAPI
from fastapi.testclient import TestClient
from pydantic import BaseModel
@ -80,11 +81,30 @@ def test_json_decode_error_empty_body():
)
assert response.status_code == 422
error = response.json()["detail"][0]
# Empty body is handled differently, not as a JSON decode error
assert error["loc"] == ["body"]
assert error["type"] == "missing"
# Handle both Pydantic v1 and v2 - empty body is handled differently
assert response.json() == IsDict(
{
"detail": [
{
"loc": ["body"],
"msg": "Field required",
"type": "missing",
"input": None,
}
]
}
) | IsDict(
# Pydantic v1
{
"detail": [
{
"loc": ["body"],
"msg": "field required",
"type": "value_error.missing",
}
]
}
)
def test_json_decode_error_unclosed_brace():

View File

@ -60,8 +60,7 @@ def test_post_with_str_float_description(client: TestClient):
def test_post_with_str_float_description_tax(client: TestClient):
response = client.post(
"/items/",
json={"name": "Foo", "price": "50.5",
"description": "Some Foo", "tax": 0.3},
json={"name": "Foo", "price": "50.5", "description": "Some Foo", "tax": 0.3},
)
assert response.status_code == 200
assert response.json() == {