🐛 Fix, avoid yield from a TaskGroup, only as an async context manager, closed in the request async exit stack (#15038)

This commit is contained in:
Sebastián Ramírez 2026-03-01 10:16:03 -08:00 committed by GitHub
parent 6038507823
commit 8a9258b169
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 50 additions and 10 deletions

View File

@ -30,6 +30,7 @@ from typing import (
import anyio
from annotated_doc import Doc
from anyio.abc import ObjectReceiveStream
from fastapi import params
from fastapi._compat import (
ModelField,
@ -526,7 +527,10 @@ def get_request_handler(
else:
sse_aiter = iterate_in_threadpool(gen)
async def _async_stream_sse() -> AsyncIterator[bytes]:
@asynccontextmanager
async def _sse_producer_cm() -> AsyncIterator[
ObjectReceiveStream[bytes]
]:
# Use a memory stream to decouple generator iteration
# from the keepalive timer. A producer task pulls items
# from the generator independently, so
@ -534,6 +538,13 @@ def get_request_handler(
# `__anext__` directly - avoiding CancelledError that
# would finalize the generator and also working for sync
# generators running in a thread pool.
#
# This context manager is entered on the request-scoped
# AsyncExitStack so its __aexit__ (which cancels the
# task group) is called by the exit stack after the
# streaming response completes — not by async generator
# finalization via GeneratorExit.
# Ref: https://peps.python.org/pep-0789/
send_stream, receive_stream = anyio.create_memory_object_stream[
bytes
](max_buffer_size=1)
@ -543,25 +554,54 @@ def get_request_handler(
async for raw_item in sse_aiter:
await send_stream.send(_serialize_sse_item(raw_item))
async with anyio.create_task_group() as tg:
tg.start_soon(_producer)
async with receive_stream:
send_keepalive, receive_keepalive = (
anyio.create_memory_object_stream[bytes](max_buffer_size=1)
)
async def _keepalive_inserter() -> None:
"""Read from the producer and forward to the output,
inserting keepalive comments on timeout."""
async with send_keepalive, receive_stream:
try:
while True:
try:
with anyio.fail_after(_PING_INTERVAL):
data = await receive_stream.receive()
yield data
# To allow for cancellation to trigger
# Ref: https://github.com/fastapi/fastapi/issues/14680
await anyio.sleep(0)
await send_keepalive.send(data)
except TimeoutError:
yield KEEPALIVE_COMMENT
await send_keepalive.send(KEEPALIVE_COMMENT)
except anyio.EndOfStream:
pass
async with anyio.create_task_group() as tg:
tg.start_soon(_producer)
tg.start_soon(_keepalive_inserter)
yield receive_keepalive
tg.cancel_scope.cancel()
# Enter the SSE context manager on the request-scoped
# exit stack. The stack outlives the streaming response,
# so __aexit__ runs via proper structured teardown, not
# via GeneratorExit thrown into an async generator.
sse_receive_stream = await async_exit_stack.enter_async_context(
_sse_producer_cm()
)
# Ensure the receive stream is closed when the exit stack
# unwinds, preventing ResourceWarning from __del__.
async_exit_stack.push_async_callback(sse_receive_stream.aclose)
async def _sse_with_checkpoints(
stream: ObjectReceiveStream[bytes],
) -> AsyncIterator[bytes]:
async for data in stream:
yield data
# Guarantee a checkpoint so cancellation can be
# delivered even when the producer is faster than
# the consumer and receive() never suspends.
await anyio.sleep(0)
sse_stream_content: AsyncIterator[bytes] | Iterator[bytes] = (
_async_stream_sse()
_sse_with_checkpoints(sse_receive_stream)
)
response = StreamingResponse(