From 8a9258b169dce3e321f614c14b1877c18750d6c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Ram=C3=ADrez?= Date: Sun, 1 Mar 2026 10:16:03 -0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Fix,=20avoid=20yield=20from=20a?= =?UTF-8?q?=20TaskGroup,=20only=20as=20an=20async=20context=20manager,=20c?= =?UTF-8?q?losed=20in=20the=20request=20async=20exit=20stack=20(#15038)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fastapi/routing.py | 60 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/fastapi/routing.py b/fastapi/routing.py index a52271690f..e2c83aa7b3 100644 --- a/fastapi/routing.py +++ b/fastapi/routing.py @@ -30,6 +30,7 @@ from typing import ( import anyio from annotated_doc import Doc +from anyio.abc import ObjectReceiveStream from fastapi import params from fastapi._compat import ( ModelField, @@ -526,7 +527,10 @@ def get_request_handler( else: sse_aiter = iterate_in_threadpool(gen) - async def _async_stream_sse() -> AsyncIterator[bytes]: + @asynccontextmanager + async def _sse_producer_cm() -> AsyncIterator[ + ObjectReceiveStream[bytes] + ]: # Use a memory stream to decouple generator iteration # from the keepalive timer. A producer task pulls items # from the generator independently, so @@ -534,6 +538,13 @@ def get_request_handler( # `__anext__` directly - avoiding CancelledError that # would finalize the generator and also working for sync # generators running in a thread pool. + # + # This context manager is entered on the request-scoped + # AsyncExitStack so its __aexit__ (which cancels the + # task group) is called by the exit stack after the + # streaming response completes — not by async generator + # finalization via GeneratorExit. + # Ref: https://peps.python.org/pep-0789/ send_stream, receive_stream = anyio.create_memory_object_stream[ bytes ](max_buffer_size=1) @@ -543,25 +554,54 @@ def get_request_handler( async for raw_item in sse_aiter: await send_stream.send(_serialize_sse_item(raw_item)) - async with anyio.create_task_group() as tg: - tg.start_soon(_producer) - async with receive_stream: + send_keepalive, receive_keepalive = ( + anyio.create_memory_object_stream[bytes](max_buffer_size=1) + ) + + async def _keepalive_inserter() -> None: + """Read from the producer and forward to the output, + inserting keepalive comments on timeout.""" + async with send_keepalive, receive_stream: try: while True: try: with anyio.fail_after(_PING_INTERVAL): data = await receive_stream.receive() - yield data - # To allow for cancellation to trigger - # Ref: https://github.com/fastapi/fastapi/issues/14680 - await anyio.sleep(0) + await send_keepalive.send(data) except TimeoutError: - yield KEEPALIVE_COMMENT + await send_keepalive.send(KEEPALIVE_COMMENT) except anyio.EndOfStream: pass + async with anyio.create_task_group() as tg: + tg.start_soon(_producer) + tg.start_soon(_keepalive_inserter) + yield receive_keepalive + tg.cancel_scope.cancel() + + # Enter the SSE context manager on the request-scoped + # exit stack. The stack outlives the streaming response, + # so __aexit__ runs via proper structured teardown, not + # via GeneratorExit thrown into an async generator. + sse_receive_stream = await async_exit_stack.enter_async_context( + _sse_producer_cm() + ) + # Ensure the receive stream is closed when the exit stack + # unwinds, preventing ResourceWarning from __del__. + async_exit_stack.push_async_callback(sse_receive_stream.aclose) + + async def _sse_with_checkpoints( + stream: ObjectReceiveStream[bytes], + ) -> AsyncIterator[bytes]: + async for data in stream: + yield data + # Guarantee a checkpoint so cancellation can be + # delivered even when the producer is faster than + # the consumer and receive() never suspends. + await anyio.sleep(0) + sse_stream_content: AsyncIterator[bytes] | Iterator[bytes] = ( - _async_stream_sse() + _sse_with_checkpoints(sse_receive_stream) ) response = StreamingResponse(