diff --git a/crates/service/src/gateway/observability/http_bridge/delivery.rs b/crates/service/src/gateway/observability/http_bridge/delivery.rs index 2112cfa1e..2403ce769 100644 --- a/crates/service/src/gateway/observability/http_bridge/delivery.rs +++ b/crates/service/src/gateway/observability/http_bridge/delivery.rs @@ -9,14 +9,14 @@ use crate::gateway::upstream::GatewayStreamResponse; use super::super::{GeminiStreamOutputMode, ResponseAdapter, ToolNameRestoreMap}; use super::{ - build_images_api_response, collect_image_generation_chat_images, + append_output_text, build_images_api_response, collect_image_generation_chat_images, collect_non_stream_json_from_sse_bytes, collect_response_reasoning_summary_text, extract_error_hint_from_body, extract_error_message_from_json, looks_like_sse_payload, merge_usage, parse_usage_from_json, push_trace_id_header, usage_has_signal, AnthropicSseReader, ChatCompletionsFromResponsesSseReader, GeminiSseReader, ImagesFromResponsesSseReader, ImagesResponseFormat, OpenAIResponsesPassthroughSseReader, PassthroughSseCollector, - PassthroughSseProtocol, PassthroughSseUsageReader, SseKeepAliveFrame, - UpstreamResponseBridgeResult, UpstreamResponseUsage, + PassthroughSseProtocol, PassthroughSseUsageReader, ResponsesFromAnthropicSseReader, + SseKeepAliveFrame, UpstreamResponseBridgeResult, UpstreamResponseUsage, }; const REQUEST_ID_HEADER_CANDIDATES: &[&str] = &["x-request-id", "x-oai-request-id"]; @@ -153,6 +153,14 @@ where ) } +#[cfg(test)] +fn response_adapter_uses_manual_chunked_streaming(response_adapter: ResponseAdapter) -> bool { + matches!( + response_adapter, + ResponseAdapter::ResponsesFromAnthropicMessages + ) +} + /// 函数 `compact_debug_suffix` /// /// 作者: gaohongshun @@ -342,6 +350,33 @@ fn anthropic_usage_from_responses(value: &Value) -> Value { Value::Object(obj) } +fn responses_usage_from_anthropic(value: &Value) -> Value { + let usage = value.get("usage").cloned().unwrap_or(Value::Null); + let input_tokens = usage + .get("input_tokens") + .and_then(Value::as_i64) + .unwrap_or_default(); + let output_tokens = usage + .get("output_tokens") + .and_then(Value::as_i64) + .unwrap_or_default(); + let cached_tokens = usage + .get("cache_read_input_tokens") + .and_then(Value::as_i64) + .unwrap_or_default(); + let reasoning_tokens = usage + .get("reasoning_output_tokens") + .and_then(Value::as_i64) + .unwrap_or_default(); + json!({ + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + "input_tokens_details": { "cached_tokens": cached_tokens }, + "output_tokens_details": { "reasoning_tokens": reasoning_tokens }, + }) +} + fn gemini_usage_from_responses(value: &Value) -> Value { let usage = value.get("usage").cloned().unwrap_or(Value::Null); let prompt = usage @@ -496,6 +531,77 @@ fn convert_responses_body_to_anthropic_messages( serde_json::to_vec(&payload).ok() } +fn convert_anthropic_messages_body_to_responses(body: &[u8]) -> Option> { + let value = serde_json::from_slice::(body).ok()?; + let response_id = value + .get("id") + .and_then(Value::as_str) + .unwrap_or("resp_codexmanager"); + let model = value.get("model").and_then(Value::as_str).unwrap_or(""); + let mut output = Vec::new(); + let mut output_text = String::new(); + if let Some(content) = value.get("content").and_then(Value::as_array) { + for item in content { + let Some(obj) = item.as_object() else { + continue; + }; + match obj.get("type").and_then(Value::as_str).unwrap_or_default() { + "text" => { + if let Some(text) = obj.get("text").and_then(Value::as_str) { + append_output_text(&mut output_text, text); + } + } + "tool_use" => { + output.push(json!({ + "id": obj + .get("id") + .and_then(Value::as_str) + .unwrap_or("toolu_unknown"), + "type": "function_call", + "status": "completed", + "call_id": obj + .get("id") + .and_then(Value::as_str) + .unwrap_or("toolu_unknown"), + "name": obj + .get("name") + .and_then(Value::as_str) + .unwrap_or("tool"), + "arguments": obj + .get("input") + .cloned() + .unwrap_or_else(|| json!({})) + .to_string(), + })); + } + _ => {} + } + } + } + if !output_text.is_empty() { + output.insert( + 0, + json!({ + "id": format!("msg_{response_id}"), + "type": "message", + "status": "completed", + "role": "assistant", + "content": [{ "type": "output_text", "text": output_text }], + }), + ); + } + let payload = json!({ + "id": response_id, + "object": "response", + "created_at": 0, + "status": "completed", + "model": model, + "output": output, + "usage": responses_usage_from_anthropic(&value), + }); + serde_json::to_vec(&payload).ok() +} + fn convert_responses_body_to_gemini_generate_content( body: &[u8], wrap_response_envelope: bool, @@ -753,6 +859,9 @@ fn convert_success_body_for_adapter( ResponseAdapter::AnthropicMessagesFromResponses => { convert_responses_body_to_anthropic_messages(body, tool_name_restore_map) } + ResponseAdapter::ResponsesFromAnthropicMessages => { + convert_anthropic_messages_body_to_responses(body) + } ResponseAdapter::ChatCompletionsFromResponses => { convert_responses_body_to_chat_completions(body) } @@ -1040,7 +1149,8 @@ fn convert_error_body_for_adapter(response_adapter: ResponseAdapter, message: &s ResponseAdapter::AnthropicMessagesFromResponses => { convert_upstream_error_to_anthropic_body(message) } - ResponseAdapter::ChatCompletionsFromResponses => serde_json::to_vec(&json!({ + ResponseAdapter::ResponsesFromAnthropicMessages + | ResponseAdapter::ChatCompletionsFromResponses => serde_json::to_vec(&json!({ "error": { "message": message, "type": "upstream_error", @@ -1080,7 +1190,8 @@ fn compatibility_stream_content_type( ) -> &'static str { match response_adapter { ResponseAdapter::AnthropicMessagesFromResponses => "text/event-stream", - ResponseAdapter::ChatCompletionsFromResponses => "text/event-stream", + ResponseAdapter::ResponsesFromAnthropicMessages + | ResponseAdapter::ChatCompletionsFromResponses => "text/event-stream", ResponseAdapter::CompactFromChatCompletions => "application/json", ResponseAdapter::ImagesB64JsonFromResponses | ResponseAdapter::ImagesUrlFromResponses => { "text/event-stream" @@ -2174,6 +2285,46 @@ pub(crate) fn respond_with_upstream( None, )); } + ResponseAdapter::ResponsesFromAnthropicMessages => { + let usage_collector = Arc::new(Mutex::new(UpstreamResponseUsage::default())); + let response_body: Box = + Box::new(ResponsesFromAnthropicSseReader::new( + upstream, + Arc::clone(&usage_collector), + fallback_model, + request_started_at, + )); + let delivery_error = + respond_streaming_chunked(request, status, headers, response_body) + .err() + .map(|err| err.to_string()); + let usage = usage_collector + .lock() + .map(|guard| guard.clone()) + .unwrap_or_default(); + return Ok(with_bridge_debug_meta( + UpstreamResponseBridgeResult { + usage, + stream_terminal_seen: true, + stream_terminal_error: None, + delivery_error, + upstream_error_hint: None, + delivered_status_code: None, + upstream_request_id: None, + upstream_cf_ray: None, + upstream_auth_error: None, + upstream_identity_error_code: None, + upstream_content_type: None, + last_sse_event_type: None, + }, + &upstream_request_id, + &upstream_cf_ray, + &upstream_auth_error, + &upstream_identity_error_code, + &upstream_content_type, + None, + )); + } ResponseAdapter::ChatCompletionsFromResponses => { let usage_collector = Arc::new(Mutex::new(PassthroughSseCollector::default())); let response_body: Box = @@ -2900,6 +3051,7 @@ pub(crate) fn respond_with_upstream( )) } ResponseAdapter::AnthropicMessagesFromResponses + | ResponseAdapter::ResponsesFromAnthropicMessages | ResponseAdapter::ChatCompletionsFromResponses | ResponseAdapter::ImagesB64JsonFromResponses | ResponseAdapter::ImagesUrlFromResponses @@ -3151,6 +3303,49 @@ pub(crate) fn respond_with_stream_upstream( None, )); } + ResponseAdapter::ResponsesFromAnthropicMessages => { + let upstream_body = upstream + .read_all_bytes() + .map_err(|err| format!("read upstream body failed: {err}"))?; + let usage_collector = Arc::new(Mutex::new(UpstreamResponseUsage::default())); + let response_body: Box = + Box::new(ResponsesFromAnthropicSseReader::from_reader( + std::io::Cursor::new(upstream_body.to_vec()), + Arc::clone(&usage_collector), + fallback_model, + request_started_at, + )); + let delivery_error = + respond_streaming_chunked(request, status, headers, response_body) + .err() + .map(|err| err.to_string()); + let usage = usage_collector + .lock() + .map(|guard| guard.clone()) + .unwrap_or_default(); + return Ok(with_bridge_debug_meta( + UpstreamResponseBridgeResult { + usage, + stream_terminal_seen: true, + stream_terminal_error: None, + delivery_error, + upstream_error_hint: None, + delivered_status_code: None, + upstream_request_id: None, + upstream_cf_ray: None, + upstream_auth_error: None, + upstream_identity_error_code: None, + upstream_content_type: None, + last_sse_event_type: None, + }, + &upstream_request_id, + &upstream_cf_ray, + &upstream_auth_error, + &upstream_identity_error_code, + &upstream_content_type, + None, + )); + } ResponseAdapter::ChatCompletionsFromResponses => { let upstream_body = upstream .read_all_bytes() @@ -3846,6 +4041,7 @@ pub(crate) fn respond_with_stream_upstream( )) } ResponseAdapter::AnthropicMessagesFromResponses + | ResponseAdapter::ResponsesFromAnthropicMessages | ResponseAdapter::ChatCompletionsFromResponses | ResponseAdapter::ImagesB64JsonFromResponses | ResponseAdapter::ImagesUrlFromResponses @@ -3881,6 +4077,7 @@ fn resolve_stream_keepalive_frame( } } ResponseAdapter::AnthropicMessagesFromResponses + | ResponseAdapter::ResponsesFromAnthropicMessages | ResponseAdapter::ChatCompletionsFromResponses | ResponseAdapter::CompactFromChatCompletions | ResponseAdapter::ImagesB64JsonFromResponses @@ -3901,8 +4098,9 @@ mod tests { convert_responses_body_to_chat_completions, convert_responses_body_to_gemini_generate_content, convert_responses_body_to_images, force_openai_responses_stream_content_type, gemini_cli_wrap_response_envelope, - merge_usage_from_body_without_output_text, write_streaming_chunked_response, HTTPVersion, - Header, ImagesResponseFormat, ResponseAdapter, StatusCode, + merge_usage_from_body_without_output_text, response_adapter_uses_manual_chunked_streaming, + write_streaming_chunked_response, HTTPVersion, Header, ImagesResponseFormat, + ResponseAdapter, StatusCode, }; use serde_json::json; use std::io::{Read, Write}; @@ -3979,6 +4177,13 @@ mod tests { assert!(writer.flushes >= 4); } + #[test] + fn responses_from_anthropic_streaming_uses_manual_chunked_delivery() { + assert!(response_adapter_uses_manual_chunked_streaming( + ResponseAdapter::ResponsesFromAnthropicMessages + )); + } + /// 函数 `compact_header_only_identity_error_is_normalized_and_classified` /// /// 作者: gaohongshun diff --git a/crates/service/src/gateway/observability/http_bridge/mod.rs b/crates/service/src/gateway/observability/http_bridge/mod.rs index c04eb9ead..3643936e2 100644 --- a/crates/service/src/gateway/observability/http_bridge/mod.rs +++ b/crates/service/src/gateway/observability/http_bridge/mod.rs @@ -412,7 +412,7 @@ pub(super) fn respond_with_upstream( pub(super) use stream_readers::{ ChatCompletionsFromResponsesSseReader, ImagesFromResponsesSseReader, OpenAIResponsesPassthroughSseReader, PassthroughSseCollector, PassthroughSseUsageReader, - SseKeepAliveFrame, + ResponsesFromAnthropicSseReader, SseKeepAliveFrame, }; pub(super) use stream_readers::{AnthropicSseReader, GeminiSseReader}; diff --git a/crates/service/src/gateway/observability/http_bridge/stream_readers.rs b/crates/service/src/gateway/observability/http_bridge/stream_readers.rs index e35492902..db88f3d8d 100644 --- a/crates/service/src/gateway/observability/http_bridge/stream_readers.rs +++ b/crates/service/src/gateway/observability/http_bridge/stream_readers.rs @@ -27,6 +27,8 @@ mod images; mod openai_responses; #[path = "stream_readers/passthrough.rs"] mod passthrough; +#[path = "stream_readers/responses_from_anthropic.rs"] +mod responses_from_anthropic; pub(crate) use anthropic::AnthropicSseReader; pub(crate) use chat_completions::ChatCompletionsFromResponsesSseReader; @@ -43,6 +45,7 @@ pub(crate) use gemini::GeminiSseReader; pub(crate) use images::ImagesFromResponsesSseReader; pub(crate) use openai_responses::OpenAIResponsesPassthroughSseReader; pub(crate) use passthrough::PassthroughSseUsageReader; +pub(crate) use responses_from_anthropic::ResponsesFromAnthropicSseReader; /// 函数 `reload_from_env` /// diff --git a/crates/service/src/gateway/observability/http_bridge/stream_readers/responses_from_anthropic.rs b/crates/service/src/gateway/observability/http_bridge/stream_readers/responses_from_anthropic.rs new file mode 100644 index 000000000..f0553ec98 --- /dev/null +++ b/crates/service/src/gateway/observability/http_bridge/stream_readers/responses_from_anthropic.rs @@ -0,0 +1,650 @@ +use super::{ + append_output_text, json, mark_first_response_ms_on_usage, should_emit_keepalive, + stream_idle_timed_out, stream_wait_timeout, Arc, Cursor, Map, Mutex, Read, SseKeepAliveFrame, + UpstreamResponseUsage, UpstreamSseFramePump, UpstreamSseFramePumpItem, Value, +}; +use std::time::Instant; + +pub(crate) struct ResponsesFromAnthropicSseReader { + upstream: UpstreamSseFramePump, + out_cursor: Cursor>, + state: ResponsesFromAnthropicState, + usage_collector: Arc>, + request_started_at: Instant, + last_upstream_activity: Instant, + saw_upstream_frame: bool, +} + +#[derive(Default)] +struct ResponsesFromAnthropicState { + response_id: Option, + model: Option, + started: bool, + text_item_started: bool, + text_part_started: bool, + text_finished: bool, + completed: bool, + output_text: String, + input_tokens: i64, + cached_input_tokens: i64, + output_tokens: i64, + total_tokens: Option, + reasoning_output_tokens: i64, + stop_reason: String, + current_tool: Option, + completed_tools: Vec, +} + +#[derive(Default)] +struct PendingToolUse { + id: String, + name: String, + input_json: String, +} + +impl ResponsesFromAnthropicSseReader { + pub(crate) fn from_reader( + upstream: R, + usage_collector: Arc>, + fallback_model: Option<&str>, + request_started_at: Instant, + ) -> Self + where + R: Read + Send + 'static, + { + let mut state = ResponsesFromAnthropicState { + stop_reason: "stop".to_string(), + ..Default::default() + }; + state.model = fallback_model + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(str::to_string); + Self { + upstream: UpstreamSseFramePump::from_reader(upstream), + out_cursor: Cursor::new(Vec::new()), + state, + usage_collector, + request_started_at, + last_upstream_activity: Instant::now(), + saw_upstream_frame: false, + } + } + + pub(crate) fn new( + upstream: reqwest::blocking::Response, + usage_collector: Arc>, + fallback_model: Option<&str>, + request_started_at: Instant, + ) -> Self { + Self::from_reader( + upstream, + usage_collector, + fallback_model, + request_started_at, + ) + } + + fn next_chunk(&mut self) -> std::io::Result> { + loop { + match self + .upstream + .recv_timeout(stream_wait_timeout(self.last_upstream_activity)) + { + Ok(UpstreamSseFramePumpItem::Frame(frame)) => { + self.last_upstream_activity = Instant::now(); + self.saw_upstream_frame = true; + let mapped = self.process_sse_frame(&frame); + if !mapped.is_empty() { + mark_first_response_ms_on_usage( + &self.usage_collector, + self.request_started_at, + ); + return Ok(mapped); + } + } + Ok(UpstreamSseFramePumpItem::Eof) + | Ok(UpstreamSseFramePumpItem::Error(_)) + | Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { + let finished = self.finish_stream(); + if !finished.is_empty() { + mark_first_response_ms_on_usage( + &self.usage_collector, + self.request_started_at, + ); + } + return Ok(finished); + } + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + if stream_idle_timed_out(self.last_upstream_activity) { + let finished = self.finish_stream(); + if !finished.is_empty() { + mark_first_response_ms_on_usage( + &self.usage_collector, + self.request_started_at, + ); + } + return Ok(finished); + } + if should_emit_keepalive(self.saw_upstream_frame) { + return Ok(SseKeepAliveFrame::OpenAIResponses.bytes().to_vec()); + } + } + } + } + } + + fn process_sse_frame(&mut self, lines: &[String]) -> Vec { + let mut event_name = String::new(); + let mut data_lines = Vec::new(); + for line in lines { + let trimmed = line.trim_end_matches(['\r', '\n']); + if let Some(rest) = trimmed.strip_prefix("event:") { + event_name = rest.trim().to_string(); + } else if let Some(rest) = trimmed.strip_prefix("data:") { + data_lines.push(rest.trim_start().to_string()); + } + } + if data_lines.is_empty() { + return Vec::new(); + } + let data = data_lines.join("\n"); + if data.trim() == "[DONE]" { + return self.finish_stream(); + } + let value = match serde_json::from_str::(&data) { + Ok(value) => value, + Err(_) => return Vec::new(), + }; + self.consume_anthropic_event(event_name.as_str(), &value) + } + + fn consume_anthropic_event(&mut self, event_name: &str, value: &Value) -> Vec { + let event_type = value + .get("type") + .and_then(Value::as_str) + .unwrap_or(event_name); + let mut out = String::new(); + match event_type { + "message_start" => { + if let Some(message) = value.get("message").and_then(Value::as_object) { + self.capture_message_start(message); + } + self.ensure_response_started(&mut out); + } + "content_block_start" => { + self.ensure_response_started(&mut out); + if let Some(block) = value.get("content_block").and_then(Value::as_object) { + self.start_content_block(block); + } + } + "content_block_delta" => { + self.ensure_response_started(&mut out); + if let Some(delta) = value.get("delta").and_then(Value::as_object) { + self.consume_content_delta(delta, &mut out); + } + } + "content_block_stop" => { + self.finish_tool_block(&mut out); + } + "message_delta" => { + if let Some(delta) = value.get("delta").and_then(Value::as_object) { + if let Some(stop_reason) = delta.get("stop_reason").and_then(Value::as_str) { + self.state.stop_reason = stop_reason.to_string(); + } + } + if let Some(usage) = value.get("usage").and_then(Value::as_object) { + self.capture_usage(usage); + } + } + "message_stop" => { + out.push_str(String::from_utf8_lossy(&self.finish_stream()).as_ref()); + } + _ => {} + } + out.into_bytes() + } + + fn capture_message_start(&mut self, message: &Map) { + if let Some(id) = message.get("id").and_then(Value::as_str) { + self.state.response_id = Some(id.to_string()); + } + if let Some(model) = message.get("model").and_then(Value::as_str) { + self.state.model = Some(model.to_string()); + } + if let Some(stop_reason) = message.get("stop_reason").and_then(Value::as_str) { + self.state.stop_reason = stop_reason.to_string(); + } + if let Some(usage) = message.get("usage").and_then(Value::as_object) { + self.capture_usage(usage); + } + } + + fn capture_usage(&mut self, usage: &Map) { + if let Some(value) = usage.get("input_tokens").and_then(Value::as_i64) { + self.state.input_tokens = value; + } + if let Some(value) = usage.get("cache_read_input_tokens").and_then(Value::as_i64) { + self.state.cached_input_tokens = value; + } + if let Some(value) = usage.get("output_tokens").and_then(Value::as_i64) { + self.state.output_tokens = value; + } + if let Some(value) = usage.get("reasoning_output_tokens").and_then(Value::as_i64) { + self.state.reasoning_output_tokens = value; + } + self.state.total_tokens = Some( + self.state.input_tokens + self.state.cached_input_tokens + self.state.output_tokens, + ); + } + + fn start_content_block(&mut self, block: &Map) { + if block + .get("type") + .and_then(Value::as_str) + .is_some_and(|kind| kind == "tool_use") + { + let id = block + .get("id") + .and_then(Value::as_str) + .unwrap_or("toolu_unknown") + .to_string(); + let name = block + .get("name") + .and_then(Value::as_str) + .unwrap_or("tool") + .to_string(); + let input_json = block.get("input").cloned().unwrap_or_else(|| json!({})); + let input_json = if input_json.as_object().is_some_and(Map::is_empty) { + String::new() + } else { + input_json.to_string() + }; + self.state.current_tool = Some(PendingToolUse { + id, + name, + input_json, + }); + } + } + + fn consume_content_delta(&mut self, delta: &Map, out: &mut String) { + match delta + .get("type") + .and_then(Value::as_str) + .unwrap_or_default() + { + "text_delta" => { + let fragment = delta + .get("text") + .and_then(Value::as_str) + .unwrap_or_default(); + if fragment.is_empty() { + return; + } + append_output_text(&mut self.state.output_text, fragment); + self.ensure_text_part_started(out); + append_sse_event( + out, + "response.output_text.delta", + &json!({ + "type": "response.output_text.delta", + "delta": fragment, + "item_id": self.text_item_id(), + "output_index": 0, + "content_index": 0, + }), + ); + } + "input_json_delta" => { + if let Some(partial) = delta.get("partial_json").and_then(Value::as_str) { + if let Some(tool) = self.state.current_tool.as_mut() { + tool.input_json.push_str(partial); + } + } + } + _ => {} + } + } + + fn ensure_response_started(&mut self, out: &mut String) { + if self.state.started { + return; + } + self.state.started = true; + let response = self.response_payload("in_progress"); + append_sse_event( + out, + "response.created", + &json!({ + "type": "response.created", + "response": response, + }), + ); + append_sse_event( + out, + "response.in_progress", + &json!({ + "type": "response.in_progress", + "response": self.response_payload("in_progress"), + }), + ); + } + + fn ensure_text_part_started(&mut self, out: &mut String) { + self.ensure_response_started(out); + if !self.state.text_item_started { + self.state.text_item_started = true; + append_sse_event( + out, + "response.output_item.added", + &json!({ + "type": "response.output_item.added", + "output_index": 0, + "item": { + "id": self.text_item_id(), + "type": "message", + "status": "in_progress", + "role": "assistant", + "content": [], + } + }), + ); + } + if !self.state.text_part_started { + self.state.text_part_started = true; + append_sse_event( + out, + "response.content_part.added", + &json!({ + "type": "response.content_part.added", + "item_id": self.text_item_id(), + "output_index": 0, + "content_index": 0, + "part": { "type": "output_text", "text": "" }, + }), + ); + } + } + + fn finish_text_item(&mut self, out: &mut String) { + if !self.state.text_part_started || self.state.text_finished { + return; + } + self.state.text_finished = true; + append_sse_event( + out, + "response.output_text.done", + &json!({ + "type": "response.output_text.done", + "text": self.state.output_text, + "item_id": self.text_item_id(), + "output_index": 0, + "content_index": 0, + }), + ); + append_sse_event( + out, + "response.content_part.done", + &json!({ + "type": "response.content_part.done", + "item_id": self.text_item_id(), + "output_index": 0, + "content_index": 0, + "part": { "type": "output_text", "text": self.state.output_text }, + }), + ); + append_sse_event( + out, + "response.output_item.done", + &json!({ + "type": "response.output_item.done", + "output_index": 0, + "item": { + "id": self.text_item_id(), + "type": "message", + "status": "completed", + "role": "assistant", + "content": [{ "type": "output_text", "text": self.state.output_text }], + } + }), + ); + } + + fn finish_tool_block(&mut self, out: &mut String) { + let Some(tool) = self.state.current_tool.take() else { + return; + }; + self.state.stop_reason = "tool_use".to_string(); + let output_index = self.completed_output_len(); + let item = json!({ + "id": tool.id, + "type": "function_call", + "status": "completed", + "call_id": tool.id, + "name": tool.name, + "arguments": normalize_json_fragment(tool.input_json.as_str()), + }); + append_sse_event( + out, + "response.output_item.added", + &json!({ + "type": "response.output_item.added", + "output_index": output_index, + "item": item.clone(), + }), + ); + append_sse_event( + out, + "response.output_item.done", + &json!({ + "type": "response.output_item.done", + "output_index": output_index, + "item": item.clone(), + }), + ); + self.state.completed_tools.push(item); + } + + fn finish_stream(&mut self) -> Vec { + if self.state.completed { + return Vec::new(); + } + let mut out = String::new(); + self.ensure_response_started(&mut out); + self.finish_text_item(&mut out); + self.finish_tool_block(&mut out); + self.state.completed = true; + self.publish_usage(); + append_sse_event( + &mut out, + "response.completed", + &json!({ + "type": "response.completed", + "response": self.response_payload("completed"), + }), + ); + out.into_bytes() + } + + fn publish_usage(&self) { + if let Ok(mut usage) = self.usage_collector.lock() { + usage.input_tokens = Some(self.state.input_tokens); + usage.cached_input_tokens = Some(self.state.cached_input_tokens); + usage.output_tokens = Some(self.state.output_tokens); + usage.total_tokens = self.state.total_tokens; + usage.reasoning_output_tokens = Some(self.state.reasoning_output_tokens); + if !self.state.output_text.trim().is_empty() { + usage.output_text = Some(self.state.output_text.clone()); + } + } + } + + fn response_payload(&self, status: &str) -> Value { + json!({ + "id": self.response_id(), + "object": "response", + "created_at": 0, + "status": status, + "model": self.model(), + "output": if status == "completed" { self.completed_output() } else { Value::Array(Vec::new()) }, + "usage": self.usage_payload(), + }) + } + + fn completed_output(&self) -> Value { + let mut output = Vec::new(); + if !self.state.output_text.is_empty() { + output.push(json!({ + "id": self.text_item_id(), + "type": "message", + "status": "completed", + "role": "assistant", + "content": [{ "type": "output_text", "text": self.state.output_text }], + })); + } + output.extend(self.state.completed_tools.iter().cloned()); + Value::Array(output) + } + + fn completed_output_len(&self) -> usize { + usize::from(!self.state.output_text.is_empty()) + self.state.completed_tools.len() + } + + fn usage_payload(&self) -> Value { + json!({ + "input_tokens": self.state.input_tokens, + "output_tokens": self.state.output_tokens, + "total_tokens": self + .state + .total_tokens + .unwrap_or(self.state.input_tokens + self.state.output_tokens), + "input_tokens_details": { "cached_tokens": self.state.cached_input_tokens }, + "output_tokens_details": { "reasoning_tokens": self.state.reasoning_output_tokens }, + }) + } + + fn response_id(&self) -> String { + self.state + .response_id + .clone() + .unwrap_or_else(|| "resp_codexmanager".to_string()) + } + + fn text_item_id(&self) -> String { + format!("msg_{}", self.response_id()) + } + + fn model(&self) -> String { + self.state.model.clone().unwrap_or_default() + } +} + +impl Read for ResponsesFromAnthropicSseReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + loop { + let n = self.out_cursor.read(buf)?; + if n > 0 { + return Ok(n); + } + let chunk = self.next_chunk()?; + if chunk.is_empty() { + return Ok(0); + } + self.out_cursor = Cursor::new(chunk); + } + } +} + +fn normalize_json_fragment(value: &str) -> String { + if value.trim().is_empty() { + return "{}".to_string(); + } + serde_json::from_str::(value) + .map(|json| json.to_string()) + .unwrap_or_else(|_| value.to_string()) +} + +fn append_sse_event(buffer: &mut String, event: &str, payload: &Value) { + buffer.push_str("event: "); + buffer.push_str(event); + buffer.push('\n'); + buffer.push_str("data: "); + buffer.push_str(payload.to_string().as_str()); + buffer.push_str("\n\n"); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::{Cursor, Read}; + + #[test] + fn anthropic_text_sse_maps_to_responses_sse() { + let upstream = concat!( + "event: message_start\n", + "data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\",\"model\":\"deepseek/deepseek-v4-pro\",\"usage\":{\"input_tokens\":3,\"output_tokens\":1}}}\n\n", + "event: content_block_start\n", + "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n", + "event: content_block_delta\n", + "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"hello\"}}\n\n", + "event: message_delta\n", + "data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\"},\"usage\":{\"output_tokens\":2}}\n\n", + "event: message_stop\n", + "data: {\"type\":\"message_stop\"}\n\n", + ); + let usage_collector = Arc::new(Mutex::new(UpstreamResponseUsage::default())); + let mut reader = ResponsesFromAnthropicSseReader::from_reader( + Cursor::new(upstream.as_bytes().to_vec()), + Arc::clone(&usage_collector), + Some("fallback-model"), + Instant::now(), + ); + let mut out = String::new(); + + reader.read_to_string(&mut out).expect("read mapped stream"); + + assert!(out.contains("event: response.created")); + assert!(out.contains("event: response.output_text.delta")); + assert!(out.contains("\"delta\":\"hello\"")); + assert!(out.contains("event: response.completed")); + assert!(out.contains("\"model\":\"deepseek/deepseek-v4-pro\"")); + let usage = usage_collector.lock().expect("usage lock").clone(); + assert_eq!(usage.input_tokens, Some(3)); + assert_eq!(usage.output_tokens, Some(2)); + assert_eq!(usage.output_text.as_deref(), Some("hello")); + } + + #[test] + fn anthropic_tool_use_sse_is_in_completed_responses_output() { + let upstream = concat!( + "event: message_start\n", + "data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_tool\",\"model\":\"deepseek-v4-pro\",\"usage\":{\"input_tokens\":4}}}\n\n", + "event: content_block_start\n", + "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"tool_use\",\"id\":\"toolu_1\",\"name\":\"read_file\",\"input\":{}}}\n\n", + "event: content_block_delta\n", + "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"{\\\"path\\\":\\\"/tmp/a\\\"}\"}}\n\n", + "event: content_block_stop\n", + "data: {\"type\":\"content_block_stop\",\"index\":0}\n\n", + "event: message_stop\n", + "data: {\"type\":\"message_stop\"}\n\n", + ); + let usage_collector = Arc::new(Mutex::new(UpstreamResponseUsage::default())); + let mut reader = ResponsesFromAnthropicSseReader::from_reader( + Cursor::new(upstream.as_bytes().to_vec()), + usage_collector, + Some("fallback-model"), + Instant::now(), + ); + let mut out = String::new(); + + reader.read_to_string(&mut out).expect("read mapped stream"); + + assert!(out.contains("event: response.output_item.done")); + assert!(out.contains("\"type\":\"function_call\"")); + assert!(out.contains("\"name\":\"read_file\"")); + assert!(out.contains("\"output\":[")); + assert!(out.contains("\"id\":\"toolu_1\"")); + assert!(out.contains("\"arguments\":\"{\\\"path\\\":\\\"/tmp/a\\\"}\"")); + } +} diff --git a/crates/service/src/gateway/observability/request_log.rs b/crates/service/src/gateway/observability/request_log.rs index f57e136d9..6ddee44d5 100644 --- a/crates/service/src/gateway/observability/request_log.rs +++ b/crates/service/src/gateway/observability/request_log.rs @@ -306,6 +306,7 @@ fn response_adapter_label(value: super::ResponseAdapter) -> &'static str { match value { super::ResponseAdapter::Passthrough => "Passthrough", super::ResponseAdapter::AnthropicMessagesFromResponses => "AnthropicMessagesFromResponses", + super::ResponseAdapter::ResponsesFromAnthropicMessages => "ResponsesFromAnthropicMessages", super::ResponseAdapter::ChatCompletionsFromResponses => "ChatCompletionsFromResponses", super::ResponseAdapter::CompactFromChatCompletions => "CompactFromChatCompletions", super::ResponseAdapter::ImagesB64JsonFromResponses => "ImagesB64JsonFromResponses", diff --git a/crates/service/src/gateway/protocol_adapter/mod.rs b/crates/service/src/gateway/protocol_adapter/mod.rs index 22efeda95..9900baa7d 100644 --- a/crates/service/src/gateway/protocol_adapter/mod.rs +++ b/crates/service/src/gateway/protocol_adapter/mod.rs @@ -1,7 +1,9 @@ mod request_router; mod types; -pub(super) use self::request_router::adapt_request_for_protocol; +pub(super) use self::request_router::{ + adapt_openai_responses_to_anthropic_messages, adapt_request_for_protocol, +}; pub(super) use self::types::{ AdaptedGatewayRequest, GeminiStreamOutputMode, ResponseAdapter, ToolNameRestoreMap, }; diff --git a/crates/service/src/gateway/protocol_adapter/request_router.rs b/crates/service/src/gateway/protocol_adapter/request_router.rs index d97df2e7d..b930bfe09 100644 --- a/crates/service/src/gateway/protocol_adapter/request_router.rs +++ b/crates/service/src/gateway/protocol_adapter/request_router.rs @@ -29,6 +29,78 @@ pub(crate) fn adapt_request_for_protocol( }) } +pub(crate) fn adapt_openai_responses_to_anthropic_messages( + body: &[u8], + model_override: Option<&str>, +) -> Result, String> { + let payload = serde_json::from_slice::(body) + .map_err(|err| format!("invalid responses request json: {err}"))?; + let obj = payload + .as_object() + .ok_or_else(|| "responses request body must be an object".to_string())?; + + let model = model_override + .and_then(normalize_text) + .or_else(|| { + obj.get("model") + .and_then(Value::as_str) + .and_then(normalize_text) + }) + .ok_or_else(|| "responses request model is required for anthropic upstream".to_string())?; + + let mut rewritten = Map::new(); + rewritten.insert("model".to_string(), Value::String(model)); + rewritten.insert( + "stream".to_string(), + Value::Bool(obj.get("stream").and_then(Value::as_bool).unwrap_or(true)), + ); + let max_tokens = obj + .get("max_output_tokens") + .or_else(|| obj.get("max_tokens")) + .and_then(Value::as_i64) + .filter(|value| *value > 0) + .unwrap_or(4096); + rewritten.insert("max_tokens".to_string(), Value::from(max_tokens)); + + let mut system_parts = Vec::new(); + if let Some(instructions) = obj + .get("instructions") + .and_then(Value::as_str) + .and_then(normalize_system_text) + { + system_parts.push(instructions); + } + + let mut messages = Vec::new(); + responses_input_to_anthropic_messages(obj.get("input"), &mut system_parts, &mut messages)?; + if messages.is_empty() { + messages.push(json!({ + "role": "user", + "content": [{ "type": "text", "text": "" }], + })); + } + rewritten.insert("messages".to_string(), Value::Array(messages)); + + if !system_parts.is_empty() { + rewritten.insert( + "system".to_string(), + Value::String(system_parts.join("\n\n")), + ); + } + if let Some(tools) = responses_tools_to_anthropic(obj.get("tools"))? { + rewritten.insert("tools".to_string(), tools); + if let Some(tool_choice) = responses_tool_choice_to_anthropic(obj.get("tool_choice")) { + rewritten.insert("tool_choice".to_string(), tool_choice); + } + } + if let Some(thinking) = responses_reasoning_to_anthropic(obj.get("reasoning"), max_tokens) { + rewritten.insert("thinking".to_string(), thinking); + } + + serde_json::to_vec(&Value::Object(rewritten)) + .map_err(|err| format!("serialize anthropic request failed: {err}")) +} + fn adapt_anthropic_messages_request( _path: &str, body: Vec, @@ -339,6 +411,281 @@ fn responses_message(role: &str, content: Value) -> Value { }) } +fn responses_input_to_anthropic_messages( + input: Option<&Value>, + system_parts: &mut Vec, + messages: &mut Vec, +) -> Result<(), String> { + match input { + Some(Value::String(text)) => { + messages.push(anthropic_message("user", vec![anthropic_text_block(text)])); + } + Some(Value::Array(items)) => { + for item in items { + responses_input_item_to_anthropic(item, system_parts, messages)?; + } + } + Some(Value::Object(_)) => { + responses_input_item_to_anthropic(input.unwrap(), system_parts, messages)?; + } + Some(_) | None => {} + } + Ok(()) +} + +fn responses_input_item_to_anthropic( + item: &Value, + system_parts: &mut Vec, + messages: &mut Vec, +) -> Result<(), String> { + let Some(obj) = item.as_object() else { + return Ok(()); + }; + match obj.get("type").and_then(Value::as_str).unwrap_or("message") { + "message" => { + let role = obj + .get("role") + .and_then(Value::as_str) + .map(str::trim) + .unwrap_or("user"); + let content = responses_message_content_to_anthropic(obj.get("content"))?; + if matches!(role, "developer" | "system") { + let text = anthropic_content_to_text(&content); + if !text.trim().is_empty() { + system_parts.push(text); + } + } else if !content.is_empty() { + messages.push(anthropic_message( + if role == "assistant" { + "assistant" + } else { + "user" + }, + content, + )); + } + } + "function_call" | "custom_tool_call" => { + let name = obj + .get("name") + .and_then(Value::as_str) + .and_then(normalize_text) + .unwrap_or_else(|| "tool".to_string()); + let id = obj + .get("call_id") + .or_else(|| obj.get("id")) + .and_then(Value::as_str) + .and_then(normalize_text) + .unwrap_or_else(generate_tool_call_id); + let input = + parse_json_string_or_value(obj.get("arguments").or_else(|| obj.get("input"))); + messages.push(anthropic_message( + "assistant", + vec![json!({ + "type": "tool_use", + "id": id, + "name": name, + "input": input, + })], + )); + } + "function_call_output" => { + let call_id = obj + .get("call_id") + .and_then(Value::as_str) + .and_then(normalize_text) + .unwrap_or_else(generate_tool_call_id); + let content = obj + .get("output") + .cloned() + .unwrap_or(Value::String(String::new())); + messages.push(anthropic_message( + "user", + vec![json!({ + "type": "tool_result", + "tool_use_id": call_id, + "content": anthropic_tool_result_content(content), + })], + )); + } + _ => {} + } + Ok(()) +} + +fn responses_message_content_to_anthropic(content: Option<&Value>) -> Result, String> { + match content { + Some(Value::String(text)) => Ok(vec![anthropic_text_block(text)]), + Some(Value::Array(parts)) => { + let mut out = Vec::new(); + for part in parts { + if let Some(mapped) = responses_content_part_to_anthropic(part)? { + out.push(mapped); + } + } + Ok(out) + } + Some(Value::Object(_)) => Ok(responses_content_part_to_anthropic(content.unwrap())? + .map(|part| vec![part]) + .unwrap_or_default()), + Some(other) => Ok(vec![anthropic_text_block(other.to_string().as_str())]), + None => Ok(Vec::new()), + } +} + +fn responses_content_part_to_anthropic(part: &Value) -> Result, String> { + let Some(obj) = part.as_object() else { + return Ok(part.as_str().map(anthropic_text_block)); + }; + let kind = obj.get("type").and_then(Value::as_str).unwrap_or("text"); + match kind { + "input_text" | "output_text" | "text" => Ok(obj + .get("text") + .and_then(Value::as_str) + .map(anthropic_text_block)), + "input_image" | "image" => Ok(None), + _ => Ok(None), + } +} + +fn anthropic_message(role: &str, content: Vec) -> Value { + json!({ + "role": role, + "content": content, + }) +} + +fn anthropic_text_block(text: &str) -> Value { + json!({ + "type": "text", + "text": text, + }) +} + +fn anthropic_content_to_text(content: &[Value]) -> String { + content + .iter() + .filter_map(|item| item.get("text").and_then(Value::as_str)) + .collect::>() + .join("\n\n") +} + +fn anthropic_tool_result_content(value: Value) -> Value { + match value { + Value::Array(_) => value, + Value::String(text) => Value::String(text), + Value::Null => Value::String(String::new()), + other => Value::String(other.to_string()), + } +} + +fn responses_tools_to_anthropic(tools: Option<&Value>) -> Result, String> { + let Some(tools) = tools else { + return Ok(None); + }; + let items = tools + .as_array() + .ok_or_else(|| "responses tools must be an array".to_string())?; + let mut out = Vec::new(); + for item in items { + let Some(tool) = item.as_object() else { + continue; + }; + if tool + .get("type") + .and_then(Value::as_str) + .is_some_and(|kind| kind == "web_search") + { + out.push(json!({ "type": "web_search_20250305" })); + continue; + } + if tool + .get("type") + .and_then(Value::as_str) + .is_some_and(|kind| kind != "function") + { + continue; + } + let Some(name) = tool + .get("name") + .and_then(Value::as_str) + .and_then(normalize_text) + else { + continue; + }; + let mut mapped = Map::new(); + mapped.insert("name".to_string(), Value::String(name)); + if let Some(description) = tool + .get("description") + .and_then(Value::as_str) + .and_then(normalize_text) + { + mapped.insert("description".to_string(), Value::String(description)); + } + mapped.insert( + "input_schema".to_string(), + tool.get("parameters") + .cloned() + .unwrap_or_else(|| json!({ "type": "object", "properties": {} })), + ); + out.push(Value::Object(mapped)); + } + Ok((!out.is_empty()).then(|| Value::Array(out))) +} + +fn responses_tool_choice_to_anthropic(tool_choice: Option<&Value>) -> Option { + match tool_choice { + Some(Value::String(value)) if value == "none" => Some(json!({ "type": "none" })), + Some(Value::String(value)) if value == "required" => Some(json!({ "type": "any" })), + Some(Value::Object(obj)) => obj + .get("name") + .or_else(|| { + obj.get("function") + .and_then(Value::as_object) + .and_then(|function| function.get("name")) + }) + .and_then(Value::as_str) + .and_then(normalize_text) + .map(|name| json!({ "type": "tool", "name": name })), + _ => Some(json!({ "type": "auto" })), + } +} + +fn responses_reasoning_to_anthropic(reasoning: Option<&Value>, max_tokens: i64) -> Option { + let obj = reasoning?.as_object()?; + let effort = obj + .get("effort") + .and_then(Value::as_str) + .map(str::trim) + .unwrap_or("medium"); + if effort.eq_ignore_ascii_case("none") { + return Some(json!({ "type": "disabled" })); + } + if max_tokens <= 1024 { + return None; + } + let budget = match effort.to_ascii_lowercase().as_str() { + "minimal" | "low" => 1024, + "high" | "xhigh" => 8192, + _ => 4096, + } + .min(max_tokens - 1); + Some(json!({ + "type": "enabled", + "budget_tokens": budget, + })) +} + +fn parse_json_string_or_value(value: Option<&Value>) -> Value { + match value { + Some(Value::String(text)) => { + serde_json::from_str::(text).unwrap_or_else(|_| Value::String(text.clone())) + } + Some(value) => value.clone(), + None => Value::Object(Map::new()), + } +} + fn anthropic_system_to_developer_message(system: Option<&Value>) -> Option { anthropic_system_to_text(system).map(|text| { json!({ @@ -1286,8 +1633,8 @@ fn generate_tool_call_id() -> String { #[cfg(test)] mod tests { use super::{ - adapt_request_for_protocol, backfill_empty_gemini_function_response_names, - gemini_function_response_output, + adapt_openai_responses_to_anthropic_messages, adapt_request_for_protocol, + backfill_empty_gemini_function_response_names, gemini_function_response_output, }; use crate::apikey_profile::{PROTOCOL_ANTHROPIC_NATIVE, PROTOCOL_GEMINI_NATIVE}; use crate::gateway::{GeminiStreamOutputMode, ResponseAdapter}; @@ -1896,6 +2243,122 @@ mod tests { assert_eq!(payload["include"][0], "reasoning.encrypted_content"); } + #[test] + fn responses_request_rewrites_to_anthropic_messages_for_claude_upstream() { + let body = json!({ + "model": "gpt-5.3", + "instructions": "be direct", + "input": [ + { + "type": "message", + "role": "user", + "content": [{ "type": "input_text", "text": "hello" }] + }, + { + "type": "function_call", + "call_id": "call_1", + "name": "read_file", + "arguments": "{\"path\":\"/tmp/a\"}" + }, + { + "type": "function_call_output", + "call_id": "call_1", + "output": "done" + } + ], + "tools": [{ + "type": "function", + "name": "read_file", + "description": "Read a file", + "parameters": { + "type": "object", + "properties": { "path": { "type": "string" } } + } + }], + "reasoning": { "effort": "high" }, + "stream": true + }); + + let adapted = adapt_openai_responses_to_anthropic_messages( + serde_json::to_vec(&body).expect("body").as_slice(), + Some("deepseek/deepseek-v4-pro"), + ) + .expect("adapt responses request"); + let payload: Value = serde_json::from_slice(&adapted).expect("parse adapted body"); + + assert_eq!(payload["model"], "deepseek/deepseek-v4-pro"); + assert_eq!(payload["system"], "be direct"); + assert_eq!(payload["stream"], true); + assert_eq!(payload["messages"][0]["role"], "user"); + assert_eq!(payload["messages"][0]["content"][0]["type"], "text"); + assert_eq!(payload["messages"][0]["content"][0]["text"], "hello"); + assert_eq!(payload["messages"][1]["role"], "assistant"); + assert_eq!(payload["messages"][1]["content"][0]["type"], "tool_use"); + assert_eq!(payload["messages"][1]["content"][0]["id"], "call_1"); + assert_eq!( + payload["messages"][1]["content"][0]["input"]["path"], + "/tmp/a" + ); + assert_eq!(payload["messages"][2]["role"], "user"); + assert_eq!(payload["messages"][2]["content"][0]["type"], "tool_result"); + assert_eq!(payload["tools"][0]["name"], "read_file"); + assert_eq!(payload["thinking"]["type"], "enabled"); + } + + #[test] + fn responses_reasoning_budget_stays_below_max_tokens() { + let body = json!({ + "model": "gpt-5.3", + "max_output_tokens": 2048, + "input": "think carefully", + "reasoning": { "effort": "high" } + }); + + let adapted = adapt_openai_responses_to_anthropic_messages( + serde_json::to_vec(&body).expect("body").as_slice(), + Some("claude-sonnet-4"), + ) + .expect("adapt responses request"); + let payload: Value = serde_json::from_slice(&adapted).expect("parse adapted body"); + + let max_tokens = payload["max_tokens"].as_i64().expect("max tokens"); + let budget_tokens = payload["thinking"]["budget_tokens"] + .as_i64() + .expect("budget tokens"); + assert!( + budget_tokens < max_tokens, + "Anthropic thinking budget_tokens must be lower than max_tokens" + ); + } + + #[test] + fn responses_request_drops_images_for_deepseek_anthropic_bridge() { + let body = json!({ + "model": "gpt-5.3", + "input": [{ + "type": "message", + "role": "user", + "content": [ + { "type": "input_text", "text": "describe this" }, + { "type": "input_image", "image_url": "data:image/png;base64,aGVsbG8=" } + ] + }] + }); + + let adapted = adapt_openai_responses_to_anthropic_messages( + serde_json::to_vec(&body).expect("body").as_slice(), + Some("deepseek-v4-pro"), + ) + .expect("adapt responses request"); + let payload: Value = serde_json::from_slice(&adapted).expect("parse adapted body"); + + assert_eq!( + payload["messages"][0]["content"].as_array().unwrap().len(), + 1 + ); + assert_eq!(payload["messages"][0]["content"][0]["type"], "text"); + } + #[test] fn gemini_thinking_config_adds_reasoning_and_include_only_when_explicit() { let body = serde_json::json!({ diff --git a/crates/service/src/gateway/protocol_adapter/types.rs b/crates/service/src/gateway/protocol_adapter/types.rs index 25b6efff8..c50b6d3e3 100644 --- a/crates/service/src/gateway/protocol_adapter/types.rs +++ b/crates/service/src/gateway/protocol_adapter/types.rs @@ -4,6 +4,7 @@ use std::collections::BTreeMap; pub(crate) enum ResponseAdapter { Passthrough, AnthropicMessagesFromResponses, + ResponsesFromAnthropicMessages, ChatCompletionsFromResponses, #[allow(dead_code)] CompactFromChatCompletions, diff --git a/crates/service/src/gateway/upstream/protocol/aggregate_api.rs b/crates/service/src/gateway/upstream/protocol/aggregate_api.rs index 0f98d8105..730ede4c8 100644 --- a/crates/service/src/gateway/upstream/protocol/aggregate_api.rs +++ b/crates/service/src/gateway/upstream/protocol/aggregate_api.rs @@ -11,6 +11,7 @@ use crate::aggregate_api::{ AGGREGATE_API_AUTH_APIKEY, AGGREGATE_API_AUTH_USERPASS, AGGREGATE_API_PROVIDER_CLAUDE, AGGREGATE_API_PROVIDER_CODEX, AGGREGATE_API_PROVIDER_GEMINI, }; +use crate::gateway::protocol_adapter::adapt_openai_responses_to_anthropic_messages; use crate::gateway::request_log::RequestLogUsage; use serde_json::Value; @@ -146,6 +147,27 @@ fn aggregate_upstream_model_for_log<'a>( candidate.model_override.as_deref().or(platform_model) } +fn should_bridge_responses_to_anthropic(candidate: &AggregateApi, path: &str) -> bool { + normalize_provider_type_value(candidate.provider_type.as_str()) == AGGREGATE_API_PROVIDER_CLAUDE + && (path == "/v1/responses" || path.starts_with("/v1/responses?")) +} + +fn responses_to_anthropic_messages_action_path(candidate: &AggregateApi, path: &str) -> String { + if candidate.action.is_some() { + return effective_action_path(candidate, path); + } + + let base_path = reqwest::Url::parse(candidate.url.as_str()) + .ok() + .map(|url| url.path().trim_end_matches('/').to_string()) + .unwrap_or_default(); + if base_path == "/v1" || base_path.ends_with("/v1") { + "/messages".to_string() + } else { + "/v1/messages".to_string() + } +} + fn replace_query_param(mut url: reqwest::Url, name: &str, value: &str) -> reqwest::Url { let name_trimmed = name.trim(); if name_trimmed.is_empty() { @@ -652,6 +674,44 @@ fn build_aggregate_api_request( Ok(builder) } +fn build_anthropic_bridge_aggregate_api_request( + client: &reqwest::blocking::Client, + request: &Request, + method: &reqwest::Method, + url: reqwest::Url, + body: &Bytes, + secret: &str, + auth_config: &AggregateApiAuthConfig, + injected_headers: &HashSet, + request_deadline: Option, + is_stream: bool, +) -> Result { + let mut builder = build_aggregate_api_request( + client, + request, + method, + url, + body, + secret, + auth_config, + injected_headers, + request_deadline, + is_stream, + )?; + builder = builder.header( + HeaderName::from_static("anthropic-version"), + HeaderValue::from_static("2023-06-01"), + ); + if matches!(auth_config, AggregateApiAuthConfig::ApiKeyDefaultBearer) { + builder = builder.header( + HeaderName::from_static("x-api-key"), + HeaderValue::from_str(secret.trim()) + .map_err(|_| "invalid aggregate api secret".to_string())?, + ); + } + Ok(builder) +} + /// 函数 `resolve_aggregate_api_rotation_candidates` /// /// 作者: gaohongshun @@ -809,7 +869,17 @@ pub(in super::super) fn proxy_aggregate_request( continue; }; - let effective_path = effective_action_path(&candidate, path); + let bridge_responses_to_anthropic = should_bridge_responses_to_anthropic(&candidate, path); + let effective_path = if bridge_responses_to_anthropic { + responses_to_anthropic_messages_action_path(&candidate, path) + } else { + effective_action_path(&candidate, path) + }; + let response_adapter_for_candidate = if bridge_responses_to_anthropic { + super::super::super::ResponseAdapter::ResponsesFromAnthropicMessages + } else { + response_adapter + }; let (auth_config, injected_headers) = match parse_auth_config(&candidate) { Ok(value) => value, Err(err) => { @@ -904,18 +974,44 @@ pub(in super::super) fn proxy_aggregate_request( _ => {} } - let builder = build_aggregate_api_request( - &client, - request.as_ref().expect("request should still be available"), - method, - url.clone(), - &rewrite_body_model_override(body, candidate.model_override.as_deref()), - secret.as_str(), - &auth_config, - &injected_headers, - request_deadline, - is_stream, - )?; + let rewritten_body = + rewrite_body_model_override(body, candidate.model_override.as_deref()); + let upstream_body = if bridge_responses_to_anthropic { + Bytes::from(adapt_openai_responses_to_anthropic_messages( + rewritten_body.as_ref(), + candidate.model_override.as_deref(), + )?) + } else { + rewritten_body + }; + + let builder = if bridge_responses_to_anthropic { + build_anthropic_bridge_aggregate_api_request( + &client, + request.as_ref().expect("request should still be available"), + method, + url.clone(), + &upstream_body, + secret.as_str(), + &auth_config, + &injected_headers, + request_deadline, + is_stream, + )? + } else { + build_aggregate_api_request( + &client, + request.as_ref().expect("request should still be available"), + method, + url.clone(), + &upstream_body, + secret.as_str(), + &auth_config, + &injected_headers, + request_deadline, + is_stream, + )? + }; let attempt_started_at = Instant::now(); let upstream = match builder.send() { @@ -980,14 +1076,15 @@ pub(in super::super) fn proxy_aggregate_request( } let inflight_guard = super::super::super::acquire_account_inflight(key_id); - let passthrough_sse_protocol = resolve_passthrough_sse_protocol(path, response_adapter); + let passthrough_sse_protocol = + resolve_passthrough_sse_protocol(path, response_adapter_for_candidate); let bridge = super::super::super::respond_with_upstream( request .take() .expect("request should be available before bridge"), GatewayUpstreamResponse::Blocking(upstream), inflight_guard, - response_adapter, + response_adapter_for_candidate, passthrough_sse_protocol, None, path, @@ -1008,7 +1105,7 @@ pub(in super::super) fn proxy_aggregate_request( super::super::super::trace_log::log_bridge_result( super::super::super::trace_log::BridgeResultLog { trace_id, - adapter: format!("{response_adapter:?}").as_str(), + adapter: format!("{response_adapter_for_candidate:?}").as_str(), path, is_stream, stream_terminal_seen: bridge.stream_terminal_seen, @@ -1066,7 +1163,7 @@ pub(in super::super) fn proxy_aggregate_request( original_path: Some(original_path), adapted_path: Some(path), gateway_mode: gateway_mode_for_log, - response_adapter: Some(response_adapter), + response_adapter: Some(response_adapter_for_candidate), effective_service_tier: effective_service_tier_for_log, aggregate_api_supplier_name: candidate_supplier_name.as_deref(), aggregate_api_url: Some(candidate_url.as_str()), @@ -1342,8 +1439,9 @@ mod tests { use codexmanager_core::storage::{now_ts, AggregateApi, Storage}; use super::{ - build_upstream_url, effective_action_path, resolve_aggregate_api_rotation_candidates, - resolve_passthrough_sse_protocol, rewrite_body_model_override, + build_anthropic_bridge_aggregate_api_request, build_upstream_url, effective_action_path, + resolve_aggregate_api_rotation_candidates, resolve_passthrough_sse_protocol, + responses_to_anthropic_messages_action_path, rewrite_body_model_override, }; use crate::aggregate_api::{ AGGREGATE_API_AUTH_APIKEY, AGGREGATE_API_PROVIDER_CLAUDE, AGGREGATE_API_PROVIDER_CODEX, @@ -1429,6 +1527,105 @@ mod tests { ); } + #[test] + fn responses_bridge_uses_messages_suffix_for_anthropic_v1_base_url() { + let mut api = aggregate_api_with_action(None); + api.url = "https://api.anthropic.com/v1".to_string(); + + let path = responses_to_anthropic_messages_action_path(&api, "/v1/responses"); + let url = build_upstream_url(api.url.as_str(), path.as_str()).expect("build upstream url"); + + assert_eq!(url.as_str(), "https://api.anthropic.com/v1/messages"); + } + + #[test] + fn responses_bridge_keeps_v1_messages_for_deepseek_anthropic_base_url() { + let mut api = aggregate_api_with_action(None); + api.url = "https://api.deepseek.com/anthropic".to_string(); + + let path = responses_to_anthropic_messages_action_path(&api, "/v1/responses"); + let url = build_upstream_url(api.url.as_str(), path.as_str()).expect("build upstream url"); + + assert_eq!( + url.as_str(), + "https://api.deepseek.com/anthropic/v1/messages" + ); + } + + #[test] + fn responses_bridge_respects_custom_action_path() { + let mut api = aggregate_api_with_action(Some("/messages?beta=true")); + api.url = "https://api.anthropic.com/v1".to_string(); + + let path = responses_to_anthropic_messages_action_path(&api, "/v1/responses"); + let url = build_upstream_url(api.url.as_str(), path.as_str()).expect("build upstream url"); + + assert_eq!( + path.as_str(), + "/messages?beta=true", + "custom action should remain the upstream bridge action" + ); + assert_eq!( + url.as_str(), + "https://api.anthropic.com/v1/messages?beta=true" + ); + } + + #[test] + fn anthropic_bridge_request_adds_required_messages_headers_with_default_auth() { + let request: tiny_http::Request = tiny_http::TestRequest::new() + .with_header( + tiny_http::Header::from_bytes("Authorization", "Bearer client-key") + .expect("auth header"), + ) + .into(); + let client = reqwest::blocking::Client::new(); + let builder = build_anthropic_bridge_aggregate_api_request( + &client, + &request, + &reqwest::Method::POST, + reqwest::Url::parse("https://api.anthropic.com/v1/messages").expect("url"), + &Bytes::from_static(br#"{"model":"claude-sonnet","messages":[]}"#), + "sk-ant-test", + &crate::gateway::upstream::protocol::aggregate_api::AggregateApiAuthConfig::ApiKeyDefaultBearer, + &std::collections::HashSet::new(), + None, + true, + ) + .expect("build request") + .build() + .expect("finalize request"); + + assert_eq!( + builder + .headers() + .get("authorization") + .and_then(|value| value.to_str().ok()), + Some("Bearer sk-ant-test") + ); + assert_eq!( + builder + .headers() + .get("x-api-key") + .and_then(|value| value.to_str().ok()), + Some("sk-ant-test") + ); + assert_eq!( + builder + .headers() + .get("anthropic-version") + .and_then(|value| value.to_str().ok()), + Some("2023-06-01") + ); + assert_eq!( + builder + .headers() + .get("accept") + .and_then(|value| value.to_str().ok()), + Some("text/event-stream") + ); + } + #[test] fn rewrite_body_model_override_replaces_json_model() { let body = Bytes::from_static(br#"{"model":"claude-sonnet","messages":[]}"#); diff --git a/crates/service/tests/gateway_logs/anthropic.rs b/crates/service/tests/gateway_logs/anthropic.rs index b09a07faf..b2895e1a0 100644 --- a/crates/service/tests/gateway_logs/anthropic.rs +++ b/crates/service/tests/gateway_logs/anthropic.rs @@ -321,6 +321,169 @@ fn gateway_aggregate_messages_passthrough_accepts_message_stop_for_non_claude_pr assert_eq!(log.actual_source_id.as_deref(), Some(aggregate_id)); } +#[test] +fn gateway_aggregate_responses_bridge_adds_anthropic_headers_and_messages_path() { + let _lock = test_env_guard(); + let dir = new_test_dir("codexmanager-gateway-responses-anthropic-bridge-headers"); + let db_path: PathBuf = dir.join("codexmanager.db"); + let _db_guard = EnvGuard::set("CODEXMANAGER_DB_PATH", db_path.to_string_lossy().as_ref()); + + let anthropic_response = serde_json::json!({ + "id": "msg_bridge_headers", + "type": "message", + "role": "assistant", + "model": "claude-3-5-sonnet-20241022", + "content": [{ "type": "text", "text": "ok" }], + "stop_reason": "end_turn", + "stop_sequence": null, + "usage": { "input_tokens": 4, "output_tokens": 1 } + }); + let (upstream_addr, upstream_rx, upstream_join) = + start_mock_upstream_sequence_lenient_with_content_types( + vec![( + 200, + serde_json::to_string(&anthropic_response).expect("serialize upstream response"), + "application/json".to_string(), + )], + Duration::from_secs(3), + ); + + let storage = Storage::open(&db_path).expect("open db"); + storage.init().expect("init db"); + seed_model_catalog_models(&storage, &["claude-3-5-sonnet-20241022"]); + let now = now_ts(); + let aggregate_id = "agg_claude_responses_bridge_headers"; + storage + .insert_aggregate_api(&AggregateApi { + id: aggregate_id.to_string(), + provider_type: "claude".to_string(), + supplier_name: Some("claude-anthropic-compatible".to_string()), + sort: 0, + url: format!("http://{upstream_addr}/v1"), + auth_type: "apikey".to_string(), + auth_params_json: None, + action: None, + model_override: None, + status: "active".to_string(), + created_at: now, + updated_at: now, + last_test_at: None, + last_test_status: None, + last_test_error: None, + balance_query_enabled: false, + balance_query_template: None, + balance_query_base_url: None, + balance_query_user_id: None, + balance_query_config_json: None, + last_balance_at: None, + last_balance_status: None, + last_balance_error: None, + last_balance_json: None, + }) + .expect("insert aggregate api"); + storage + .upsert_aggregate_api_secret(aggregate_id, "upstream-secret") + .expect("insert aggregate secret"); + storage + .upsert_model_source_model(&ModelSourceModel { + source_kind: "aggregate_api".to_string(), + source_id: aggregate_id.to_string(), + upstream_model: "claude-3-5-sonnet-20241022".to_string(), + display_name: Some("Claude 3.5 Sonnet".to_string()), + status: "available".to_string(), + discovery_kind: "manual".to_string(), + last_synced_at: Some(now), + extra_json: "{}".to_string(), + created_at: now, + updated_at: now, + }) + .expect("insert aggregate source model"); + storage + .upsert_model_source_mapping(&ModelSourceMapping { + id: "mapping_claude_responses_bridge_headers".to_string(), + platform_model_slug: "claude-3-5-sonnet-20241022".to_string(), + source_kind: "aggregate_api".to_string(), + source_id: aggregate_id.to_string(), + upstream_model: "claude-3-5-sonnet-20241022".to_string(), + enabled: true, + priority: 0, + weight: 1, + billing_model_slug: None, + created_at: now, + updated_at: now, + }) + .expect("insert aggregate source mapping"); + + let platform_key = "pk_claude_responses_bridge_headers"; + storage + .insert_api_key(&ApiKey { + id: "gk_claude_responses_bridge_headers".to_string(), + name: Some("claude-responses-bridge-headers".to_string()), + model_slug: Some("claude-3-5-sonnet-20241022".to_string()), + reasoning_effort: None, + service_tier: None, + rotation_strategy: "aggregate_api_rotation".to_string(), + aggregate_api_id: Some(aggregate_id.to_string()), + account_plan_filter: None, + aggregate_api_url: None, + client_type: "codex".to_string(), + protocol_type: "openai_compat".to_string(), + auth_scheme: "x_api_key".to_string(), + upstream_base_url: None, + static_headers_json: None, + key_hash: hash_platform_key_for_test(platform_key), + status: "active".to_string(), + created_at: now, + last_used_at: None, + }) + .expect("insert api key"); + + let server = codexmanager_service::start_one_shot_server().expect("start server"); + let body = serde_json::json!({ + "model": "claude-3-5-sonnet-20241022", + "input": "hello", + "stream": false + }); + let body = serde_json::to_string(&body).expect("serialize request"); + let (status, response_body) = post_http_raw( + &server.addr, + "/v1/responses", + &body, + &[ + ("Content-Type", "application/json"), + ("x-api-key", platform_key), + ], + ); + server.join(); + assert_eq!(status, 200, "gateway response: {response_body}"); + + let captured = upstream_rx + .recv_timeout(Duration::from_secs(3)) + .expect("receive upstream request"); + upstream_join.join().expect("join mock upstream"); + assert_eq!(captured.path, "/v1/messages"); + assert_eq!( + captured.headers.get("authorization").map(String::as_str), + Some("Bearer upstream-secret") + ); + assert_eq!( + captured.headers.get("x-api-key").map(String::as_str), + Some("upstream-secret") + ); + assert_eq!( + captured + .headers + .get("anthropic-version") + .map(String::as_str), + Some("2023-06-01") + ); + + let upstream_body = decode_upstream_request_body(&captured); + let upstream_payload: serde_json::Value = + serde_json::from_slice(&upstream_body).expect("parse upstream payload"); + assert_eq!(upstream_payload["messages"][0]["role"], "user"); +} + /// 函数 `gateway_claude_messages_stay_on_chatgpt_codex_base` /// /// 作者: gaohongshun