From 3d26a09dc7b1a7c13da57fdd26d1cf22efa81229 Mon Sep 17 00:00:00 2001 From: R Date: Tue, 6 Jan 2026 16:17:13 +0100 Subject: [PATCH] server : add thinking content blocks to Anthropic Messages API (#18551) * server : add thinking content blocks to Anthropic Messages API Add support for returning reasoning/thinking content in Anthropic API responses when using models with --reasoning-format deepseek and the thinking parameter enabled. - Non-streaming: adds thinking block before text in content array - Streaming: emits thinking_delta events with correct block indices - Partial streaming: tracks reasoning state across chunks via anthropic_has_reasoning member variable Tested with bartowski/DeepSeek-R1-Distill-Qwen-7B-GGUF model. * server : fix Anthropic API streaming for thinking content blocks Add signature field and fix duplicate content_block_start events in Anthropic Messages API streaming responses for reasoning models. * server: refactor Anthropic streaming state to avoid raw pointer Replace raw pointer to task_result_state with direct field copies: - Copy state fields in update() before processing chunk - Use local copies in to_json_anthropic() instead of dereferencing - Pre-compute state updates for next chunk in update() This makes the data flow clearer and avoids unsafe pointer patterns. --- tools/server/server-task.cpp | 149 +++++++++++++++--- tools/server/server-task.h | 26 +++ .../tests/unit/test_compat_anthropic.py | 89 +++++++++++ 3 files changed, 246 insertions(+), 18 deletions(-) diff --git a/tools/server/server-task.cpp b/tools/server/server-task.cpp index 6d374131e3..ed4f6546ea 100644 --- a/tools/server/server-task.cpp +++ b/tools/server/server-task.cpp @@ -814,6 +814,15 @@ json server_task_result_cmpl_final::to_json_anthropic() { msg.content = content; } + // thinking block comes first (Anthropic extended thinking format) + if (!msg.reasoning_content.empty()) { + content_blocks.push_back({ + {"type", "thinking"}, + {"thinking", msg.reasoning_content}, + {"signature", ""} // empty signature for local models (no cryptographic verification) + }); + } + if (!msg.content.empty()) { content_blocks.push_back({ {"type", "text"}, @@ -862,20 +871,57 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() { stop_reason = oaicompat_msg.tool_calls.empty() ? "end_turn" : "tool_use"; } - bool has_text = !oaicompat_msg.content.empty(); + bool has_thinking = !oaicompat_msg.reasoning_content.empty(); + bool has_text = !oaicompat_msg.content.empty(); size_t num_tool_calls = oaicompat_msg.tool_calls.size(); - bool text_block_started = false; + // content block indices: thinking (0) -> text (0 or 1) -> tool_use (n+) + size_t thinking_block_index = 0; + size_t text_block_index = has_thinking ? 1 : 0; + + bool thinking_block_started = false; + bool text_block_started = false; std::unordered_set tool_calls_started; for (const auto & diff : oaicompat_msg_diffs) { + // handle thinking/reasoning content + if (!diff.reasoning_content_delta.empty()) { + if (!thinking_block_started) { + events.push_back({ + {"event", "content_block_start"}, + {"data", { + {"type", "content_block_start"}, + {"index", thinking_block_index}, + {"content_block", { + {"type", "thinking"}, + {"thinking", ""} + }} + }} + }); + thinking_block_started = true; + } + + events.push_back({ + {"event", "content_block_delta"}, + {"data", { + {"type", "content_block_delta"}, + {"index", thinking_block_index}, + {"delta", { + {"type", "thinking_delta"}, + {"thinking", diff.reasoning_content_delta} + }} + }} + }); + } + + // handle regular text content if (!diff.content_delta.empty()) { if (!text_block_started) { events.push_back({ {"event", "content_block_start"}, {"data", { {"type", "content_block_start"}, - {"index", 0}, + {"index", text_block_index}, {"content_block", { {"type", "text"}, {"text", ""} @@ -889,7 +935,7 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() { {"event", "content_block_delta"}, {"data", { {"type", "content_block_delta"}, - {"index", 0}, + {"index", text_block_index}, {"delta", { {"type", "text_delta"}, {"text", diff.content_delta} @@ -898,8 +944,9 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() { }); } + // handle tool calls if (diff.tool_call_index != std::string::npos) { - size_t content_block_index = (has_text ? 1 : 0) + diff.tool_call_index; + size_t content_block_index = (has_thinking ? 1 : 0) + (has_text ? 1 : 0) + diff.tool_call_index; if (tool_calls_started.find(diff.tool_call_index) == tool_calls_started.end()) { const auto & full_tool_call = oaicompat_msg.tool_calls[diff.tool_call_index]; @@ -935,18 +982,42 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() { } } + // close content blocks in order + if (has_thinking) { + // Anthropic API requires a signature_delta before closing thinking blocks + // We use an empty signature since we can't generate a cryptographic signature for local models + events.push_back({ + {"event", "content_block_delta"}, + {"data", { + {"type", "content_block_delta"}, + {"index", thinking_block_index}, + {"delta", { + {"type", "signature_delta"}, + {"signature", ""} + }} + }} + }); + events.push_back({ + {"event", "content_block_stop"}, + {"data", { + {"type", "content_block_stop"}, + {"index", thinking_block_index} + }} + }); + } + if (has_text) { events.push_back({ {"event", "content_block_stop"}, {"data", { {"type", "content_block_stop"}, - {"index", 0} + {"index", text_block_index} }} }); } for (size_t i = 0; i < num_tool_calls; i++) { - size_t content_block_index = (has_text ? 1 : 0) + i; + size_t content_block_index = (has_thinking ? 1 : 0) + (has_text ? 1 : 0) + i; events.push_back({ {"event", "content_block_stop"}, {"data", { @@ -1154,11 +1225,10 @@ json server_task_result_rerank::to_json() { json server_task_result_cmpl_partial::to_json_anthropic() { json events = json::array(); bool first = (n_decoded == 1); - bool text_block_started = false; + // use member variables to track block state across streaming calls + // (anthropic_thinking_block_started, anthropic_text_block_started) if (first) { - text_block_started = false; - events.push_back({ {"event", "message_start"}, {"data", { @@ -1180,28 +1250,69 @@ json server_task_result_cmpl_partial::to_json_anthropic() { }); } + // content block indices: thinking (0) -> text (0 or 1) -> tool_use (n+) + size_t thinking_block_index = 0; + // use anthropic_has_reasoning (set in update()) to know if ANY reasoning was generated + size_t text_block_index = anthropic_has_reasoning ? 1 : 0; + + // use local copies of streaming state (copied from task_result_state in update()) + // these reflect the state BEFORE this chunk was processed + bool thinking_started = anthropic_thinking_block_started; + bool text_started = anthropic_text_block_started; + for (const auto & diff : oaicompat_msg_diffs) { - if (!diff.content_delta.empty()) { - if (!text_block_started) { + // handle thinking/reasoning content + if (!diff.reasoning_content_delta.empty()) { + if (!thinking_started) { events.push_back({ {"event", "content_block_start"}, {"data", { {"type", "content_block_start"}, - {"index", 0}, + {"index", thinking_block_index}, {"content_block", { - {"type", "text"}, - {"text", ""} + {"type", "thinking"}, + {"thinking", ""} }} }} }); - text_block_started = true; + thinking_started = true; } events.push_back({ {"event", "content_block_delta"}, {"data", { {"type", "content_block_delta"}, - {"index", 0}, + {"index", thinking_block_index}, + {"delta", { + {"type", "thinking_delta"}, + {"thinking", diff.reasoning_content_delta} + }} + }} + }); + } + + // handle regular text content + if (!diff.content_delta.empty()) { + if (!text_started) { + events.push_back({ + {"event", "content_block_start"}, + {"data", { + {"type", "content_block_start"}, + {"index", text_block_index}, + {"content_block", { + {"type", "text"}, + {"text", ""} + }} + }} + }); + text_started = true; + } + + events.push_back({ + {"event", "content_block_delta"}, + {"data", { + {"type", "content_block_delta"}, + {"index", text_block_index}, {"delta", { {"type", "text_delta"}, {"text", diff.content_delta} @@ -1210,8 +1321,10 @@ json server_task_result_cmpl_partial::to_json_anthropic() { }); } + // handle tool calls if (diff.tool_call_index != std::string::npos) { - size_t content_block_index = (text_block_started ? 1 : 0) + diff.tool_call_index; + // use anthropic_has_reasoning for thinking block count (persists across calls) + size_t content_block_index = (anthropic_has_reasoning ? 1 : 0) + (text_started ? 1 : 0) + diff.tool_call_index; if (!diff.tool_call_delta.name.empty()) { events.push_back({ diff --git a/tools/server/server-task.h b/tools/server/server-task.h index 687770de5e..ead1491182 100644 --- a/tools/server/server-task.h +++ b/tools/server/server-task.h @@ -96,6 +96,10 @@ struct task_result_state { std::string generated_text; // append new chunks of generated text here std::vector generated_tool_call_ids; + // for Anthropic API streaming: track content block state across chunks + bool anthropic_thinking_block_started = false; + bool anthropic_text_block_started = false; + task_result_state(const common_chat_syntax & oaicompat_chat_syntax) : oaicompat_chat_syntax(oaicompat_chat_syntax) {} @@ -337,6 +341,12 @@ struct server_task_result_cmpl_partial : server_task_result { std::vector oaicompat_msg_diffs; // to be populated by update() bool is_updated = false; + // for Anthropic API: track if any reasoning content has been generated + bool anthropic_has_reasoning = false; + // Streaming state copied from task_result_state for this chunk + bool anthropic_thinking_block_started = false; + bool anthropic_text_block_started = false; + virtual bool is_stop() override { return false; // in stream mode, partial responses are not considered stop } @@ -346,6 +356,22 @@ struct server_task_result_cmpl_partial : server_task_result { virtual void update(task_result_state & state) override { is_updated = true; state.update_chat_msg(content, true, oaicompat_msg_diffs); + // track if the accumulated message has any reasoning content + anthropic_has_reasoning = !state.chat_msg.reasoning_content.empty(); + + // Copy current state for use in to_json_anthropic() (reflects state BEFORE this chunk) + anthropic_thinking_block_started = state.anthropic_thinking_block_started; + anthropic_text_block_started = state.anthropic_text_block_started; + + // Pre-compute state updates based on diffs (for next chunk) + for (const auto & diff : oaicompat_msg_diffs) { + if (!diff.reasoning_content_delta.empty() && !state.anthropic_thinking_block_started) { + state.anthropic_thinking_block_started = true; + } + if (!diff.content_delta.empty() && !state.anthropic_text_block_started) { + state.anthropic_text_block_started = true; + } + } } json to_json_non_oaicompat(); diff --git a/tools/server/tests/unit/test_compat_anthropic.py b/tools/server/tests/unit/test_compat_anthropic.py index e0a003557e..e16e0235c6 100644 --- a/tools/server/tests/unit/test_compat_anthropic.py +++ b/tools/server/tests/unit/test_compat_anthropic.py @@ -805,3 +805,92 @@ def test_anthropic_vs_openai_different_response_format(): assert "input_tokens" in anthropic_res.body["usage"] assert "completion_tokens" in openai_res.body["usage"] assert "output_tokens" in anthropic_res.body["usage"] + + +# Extended thinking tests with reasoning models + +@pytest.mark.slow +@pytest.mark.parametrize("stream", [False, True]) +def test_anthropic_thinking_with_reasoning_model(stream): + """Test that thinking content blocks are properly returned for reasoning models""" + global server + server = ServerProcess() + server.model_hf_repo = "bartowski/DeepSeek-R1-Distill-Qwen-7B-GGUF" + server.model_hf_file = "DeepSeek-R1-Distill-Qwen-7B-Q4_K_M.gguf" + server.reasoning_format = "deepseek" + server.jinja = True + server.n_ctx = 8192 + server.n_predict = 1024 + server.server_port = 8084 + server.start(timeout_seconds=600) # large model needs time to download + + if stream: + res = server.make_stream_request("POST", "/v1/messages", data={ + "model": "test", + "max_tokens": 1024, + "thinking": { + "type": "enabled", + "budget_tokens": 500 + }, + "messages": [ + {"role": "user", "content": "What is 2+2?"} + ], + "stream": True + }) + + events = list(res) + + # should have thinking content block events + thinking_starts = [e for e in events if + e.get("type") == "content_block_start" and + e.get("content_block", {}).get("type") == "thinking"] + assert len(thinking_starts) > 0, "Should have thinking content_block_start event" + assert thinking_starts[0]["index"] == 0, "Thinking block should be at index 0" + + # should have thinking_delta events + thinking_deltas = [e for e in events if + e.get("type") == "content_block_delta" and + e.get("delta", {}).get("type") == "thinking_delta"] + assert len(thinking_deltas) > 0, "Should have thinking_delta events" + + # should have signature_delta event before thinking block closes (Anthropic API requirement) + signature_deltas = [e for e in events if + e.get("type") == "content_block_delta" and + e.get("delta", {}).get("type") == "signature_delta"] + assert len(signature_deltas) > 0, "Should have signature_delta event for thinking block" + + # should have text block after thinking + text_starts = [e for e in events if + e.get("type") == "content_block_start" and + e.get("content_block", {}).get("type") == "text"] + assert len(text_starts) > 0, "Should have text content_block_start event" + assert text_starts[0]["index"] == 1, "Text block should be at index 1 (after thinking)" + else: + res = server.make_request("POST", "/v1/messages", data={ + "model": "test", + "max_tokens": 1024, + "thinking": { + "type": "enabled", + "budget_tokens": 500 + }, + "messages": [ + {"role": "user", "content": "What is 2+2?"} + ] + }) + + assert res.status_code == 200 + assert res.body["type"] == "message" + + content = res.body["content"] + assert len(content) >= 2, "Should have at least thinking and text blocks" + + # first block should be thinking + thinking_blocks = [b for b in content if b.get("type") == "thinking"] + assert len(thinking_blocks) > 0, "Should have thinking content block" + assert "thinking" in thinking_blocks[0], "Thinking block should have 'thinking' field" + assert len(thinking_blocks[0]["thinking"]) > 0, "Thinking content should not be empty" + assert "signature" in thinking_blocks[0], "Thinking block should have 'signature' field (Anthropic API requirement)" + + # should also have text block + text_blocks = [b for b in content if b.get("type") == "text"] + assert len(text_blocks) > 0, "Should have text content block"