diff --git a/fastapi/routing.py b/fastapi/routing.py index a52271690f..e2c83aa7b3 100644 --- a/fastapi/routing.py +++ b/fastapi/routing.py @@ -30,6 +30,7 @@ from typing import ( import anyio from annotated_doc import Doc +from anyio.abc import ObjectReceiveStream from fastapi import params from fastapi._compat import ( ModelField, @@ -526,7 +527,10 @@ def get_request_handler( else: sse_aiter = iterate_in_threadpool(gen) - async def _async_stream_sse() -> AsyncIterator[bytes]: + @asynccontextmanager + async def _sse_producer_cm() -> AsyncIterator[ + ObjectReceiveStream[bytes] + ]: # Use a memory stream to decouple generator iteration # from the keepalive timer. A producer task pulls items # from the generator independently, so @@ -534,6 +538,13 @@ def get_request_handler( # `__anext__` directly - avoiding CancelledError that # would finalize the generator and also working for sync # generators running in a thread pool. + # + # This context manager is entered on the request-scoped + # AsyncExitStack so its __aexit__ (which cancels the + # task group) is called by the exit stack after the + # streaming response completes — not by async generator + # finalization via GeneratorExit. + # Ref: https://peps.python.org/pep-0789/ send_stream, receive_stream = anyio.create_memory_object_stream[ bytes ](max_buffer_size=1) @@ -543,25 +554,54 @@ def get_request_handler( async for raw_item in sse_aiter: await send_stream.send(_serialize_sse_item(raw_item)) - async with anyio.create_task_group() as tg: - tg.start_soon(_producer) - async with receive_stream: + send_keepalive, receive_keepalive = ( + anyio.create_memory_object_stream[bytes](max_buffer_size=1) + ) + + async def _keepalive_inserter() -> None: + """Read from the producer and forward to the output, + inserting keepalive comments on timeout.""" + async with send_keepalive, receive_stream: try: while True: try: with anyio.fail_after(_PING_INTERVAL): data = await receive_stream.receive() - yield data - # To allow for cancellation to trigger - # Ref: https://github.com/fastapi/fastapi/issues/14680 - await anyio.sleep(0) + await send_keepalive.send(data) except TimeoutError: - yield KEEPALIVE_COMMENT + await send_keepalive.send(KEEPALIVE_COMMENT) except anyio.EndOfStream: pass + async with anyio.create_task_group() as tg: + tg.start_soon(_producer) + tg.start_soon(_keepalive_inserter) + yield receive_keepalive + tg.cancel_scope.cancel() + + # Enter the SSE context manager on the request-scoped + # exit stack. The stack outlives the streaming response, + # so __aexit__ runs via proper structured teardown, not + # via GeneratorExit thrown into an async generator. + sse_receive_stream = await async_exit_stack.enter_async_context( + _sse_producer_cm() + ) + # Ensure the receive stream is closed when the exit stack + # unwinds, preventing ResourceWarning from __del__. + async_exit_stack.push_async_callback(sse_receive_stream.aclose) + + async def _sse_with_checkpoints( + stream: ObjectReceiveStream[bytes], + ) -> AsyncIterator[bytes]: + async for data in stream: + yield data + # Guarantee a checkpoint so cancellation can be + # delivered even when the producer is faster than + # the consumer and receive() never suspends. + await anyio.sleep(0) + sse_stream_content: AsyncIterator[bytes] | Iterator[bytes] = ( - _async_stream_sse() + _sse_with_checkpoints(sse_receive_stream) ) response = StreamingResponse(