This commit is contained in:
Matthew Batema 2026-02-07 08:13:16 +00:00 committed by GitHub
commit fcf42ae8be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 146 additions and 23 deletions

View File

@ -65,6 +65,17 @@ class UploadFile(StarletteUploadFile):
Optional[str], Doc("The content type of the request, from the headers.") Optional[str], Doc("The content type of the request, from the headers.")
] ]
@classmethod
def from_starlette(
cls: type["UploadFile"], starlette_uploadfile: StarletteUploadFile
) -> "UploadFile":
return cls(
file=starlette_uploadfile.file,
size=starlette_uploadfile.size,
filename=starlette_uploadfile.filename,
headers=starlette_uploadfile.headers,
)
async def write( async def write(
self, self,
data: Annotated[ data: Annotated[

View File

@ -44,6 +44,7 @@ from fastapi.concurrency import (
asynccontextmanager, asynccontextmanager,
contextmanager_in_threadpool, contextmanager_in_threadpool,
) )
from fastapi.datastructures import UploadFile as FastAPIUploadFile
from fastapi.dependencies.models import Dependant from fastapi.dependencies.models import Dependant
from fastapi.exceptions import DependencyScopeError from fastapi.exceptions import DependencyScopeError
from fastapi.logger import logger from fastapi.logger import logger
@ -882,31 +883,33 @@ async def _extract_form_body(
for field in body_fields: for field in body_fields:
value = _get_multidict_value(field, received_body) value = _get_multidict_value(field, received_body)
field_info = field.field_info field_info = field.field_info
if ( if isinstance(field_info, params.File) and isinstance(value, UploadFile):
isinstance(field_info, params.File) if is_bytes_field(field):
and is_bytes_field(field) value = await value.read()
and isinstance(value, UploadFile) else:
): value = FastAPIUploadFile.from_starlette(value)
value = await value.read() elif isinstance(field_info, params.File) and value_is_sequence(value):
elif ( if is_bytes_sequence_field(field):
is_bytes_sequence_field(field) # For types
and isinstance(field_info, params.File) assert isinstance(value, sequence_types)
and value_is_sequence(value) results: list[Union[bytes, str]] = []
):
# For types
assert isinstance(value, sequence_types)
results: list[Union[bytes, str]] = []
async def process_fn( async def process_fn(
fn: Callable[[], Coroutine[Any, Any, Any]], fn: Callable[[], Coroutine[Any, Any, Any]],
) -> None: ) -> None:
result = await fn() result = await fn()
results.append(result) # noqa: B023 results.append(result) # noqa: B023
async with anyio.create_task_group() as tg: async with anyio.create_task_group() as tg:
for sub_value in value: for sub_value in value:
tg.start_soon(process_fn, sub_value.read) tg.start_soon(process_fn, sub_value.read)
value = serialize_sequence_value(field=field, value=results) value = serialize_sequence_value(field=field, value=results)
else:
value = [
FastAPIUploadFile.from_starlette(sub_value)
for sub_value in value
if isinstance(sub_value, UploadFile)
]
if value is not None: if value is not None:
values[get_validation_alias(field)] = value values[get_validation_alias(field)] = value
field_aliases = {get_validation_alias(field) for field in body_fields} field_aliases = {get_validation_alias(field) for field in body_fields}

View File

@ -0,0 +1,109 @@
import io
from typing import Any
import pytest
from fastapi import Depends, FastAPI, File, UploadFile
from fastapi.testclient import TestClient
from starlette.datastructures import UploadFile as StarletteUploadFile
app = FastAPI()
@app.post("/uploadfile")
async def uploadfile(uploadfile: UploadFile = File(...)) -> dict[str, Any]:
return {
"filename": uploadfile.filename,
"is_fastapi_uploadfile": isinstance(uploadfile, UploadFile),
"is_starlette_uploadfile": isinstance(uploadfile, StarletteUploadFile),
"class": f"{uploadfile.__class__.__module__}.{uploadfile.__class__.__name__}",
}
@app.post("/uploadfiles")
async def uploadfiles(
uploadfiles: list[UploadFile] = File(...),
) -> list[dict[str, Any]]:
return [
{
"filename": uploadfile.filename,
"is_fastapi_uploadfile": isinstance(uploadfile, UploadFile),
"is_starlette_uploadfile": isinstance(uploadfile, StarletteUploadFile),
"class": f"{uploadfile.__class__.__module__}.{uploadfile.__class__.__name__}",
}
for uploadfile in uploadfiles
]
async def get_uploadfile_info(uploadfile: UploadFile = File(...)) -> dict[str, Any]:
return {
"filename": uploadfile.filename,
"is_fastapi_uploadfile": isinstance(uploadfile, UploadFile),
"is_starlette_uploadfile": isinstance(uploadfile, StarletteUploadFile),
"class": f"{uploadfile.__class__.__module__}.{uploadfile.__class__.__name__}",
}
@app.post("/uploadfile-dep")
async def uploadfile_dep(
uploadfile_info: dict[str, Any] = Depends(get_uploadfile_info),
) -> dict[str, Any]:
return uploadfile_info
async def get_uploadfiles_info(
uploadfiles: list[UploadFile] = File(...),
) -> list[dict[str, Any]]:
return [
{
"filename": uploadfile.filename,
"is_fastapi_uploadfile": isinstance(uploadfile, UploadFile),
"is_starlette_uploadfile": isinstance(uploadfile, StarletteUploadFile),
"class": f"{uploadfile.__class__.__module__}.{uploadfile.__class__.__name__}",
}
for uploadfile in uploadfiles
]
@app.post("/uploadfiles-dep")
async def uploadfiles_dep(
uploadfiles_info: list[dict[str, Any]] = Depends(get_uploadfiles_info),
) -> list[dict[str, Any]]:
return uploadfiles_info
@pytest.mark.parametrize("endpoint", ["/uploadfile", "/uploadfile-dep"])
def test_uploadfile_type(endpoint: str) -> None:
client = TestClient(app)
files = {"uploadfile": ("example.txt", io.BytesIO(b"test content"), "text/plain")}
response = client.post(f"{endpoint}", files=files)
data = response.json()
assert data["filename"] == "example.txt"
assert data["is_fastapi_uploadfile"] is True
assert data["is_starlette_uploadfile"] is True
assert data["class"].startswith("fastapi.")
@pytest.mark.parametrize("endpoint", ["/uploadfiles", "/uploadfiles-dep"])
def test_uploadfiles_type(endpoint: str) -> None:
client = TestClient(app)
files = [
("uploadfiles", ("example.txt", io.BytesIO(b"test content"), "text/plain")),
("uploadfiles", ("example2.txt", io.BytesIO(b"test content"), "text/plain")),
]
response = client.post(f"{endpoint}", files=files)
files_data = response.json()
assert len(files_data) == 2
file1 = files_data[0]
assert file1["filename"] == "example.txt"
assert file1["is_fastapi_uploadfile"] is True
assert file1["is_starlette_uploadfile"] is True
assert file1["class"].startswith("fastapi.")
file2 = files_data[1]
assert file2["filename"] == "example2.txt"
assert file2["is_fastapi_uploadfile"] is True
assert file2["is_starlette_uploadfile"] is True
assert file2["class"].startswith("fastapi.")