Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 212 additions & 7 deletions crates/service/src/gateway/observability/http_bridge/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use crate::gateway::upstream::GatewayStreamResponse;

use super::super::{GeminiStreamOutputMode, ResponseAdapter, ToolNameRestoreMap};
use super::{
build_images_api_response, collect_image_generation_chat_images,
append_output_text, build_images_api_response, collect_image_generation_chat_images,
collect_non_stream_json_from_sse_bytes, collect_response_reasoning_summary_text,
extract_error_hint_from_body, extract_error_message_from_json, looks_like_sse_payload,
merge_usage, parse_usage_from_json, push_trace_id_header, usage_has_signal, AnthropicSseReader,
ChatCompletionsFromResponsesSseReader, GeminiSseReader, ImagesFromResponsesSseReader,
ImagesResponseFormat, OpenAIResponsesPassthroughSseReader, PassthroughSseCollector,
PassthroughSseProtocol, PassthroughSseUsageReader, SseKeepAliveFrame,
UpstreamResponseBridgeResult, UpstreamResponseUsage,
PassthroughSseProtocol, PassthroughSseUsageReader, ResponsesFromAnthropicSseReader,
SseKeepAliveFrame, UpstreamResponseBridgeResult, UpstreamResponseUsage,
};

const REQUEST_ID_HEADER_CANDIDATES: &[&str] = &["x-request-id", "x-oai-request-id"];
Expand Down Expand Up @@ -153,6 +153,14 @@ where
)
}

#[cfg(test)]
fn response_adapter_uses_manual_chunked_streaming(response_adapter: ResponseAdapter) -> bool {
matches!(
response_adapter,
ResponseAdapter::ResponsesFromAnthropicMessages
)
}

/// 函数 `compact_debug_suffix`
///
/// 作者: gaohongshun
Expand Down Expand Up @@ -342,6 +350,33 @@ fn anthropic_usage_from_responses(value: &Value) -> Value {
Value::Object(obj)
}

fn responses_usage_from_anthropic(value: &Value) -> Value {
let usage = value.get("usage").cloned().unwrap_or(Value::Null);
let input_tokens = usage
.get("input_tokens")
.and_then(Value::as_i64)
.unwrap_or_default();
let output_tokens = usage
.get("output_tokens")
.and_then(Value::as_i64)
.unwrap_or_default();
let cached_tokens = usage
.get("cache_read_input_tokens")
.and_then(Value::as_i64)
.unwrap_or_default();
let reasoning_tokens = usage
.get("reasoning_output_tokens")
.and_then(Value::as_i64)
.unwrap_or_default();
json!({
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
"input_tokens_details": { "cached_tokens": cached_tokens },
"output_tokens_details": { "reasoning_tokens": reasoning_tokens },
})
}

fn gemini_usage_from_responses(value: &Value) -> Value {
let usage = value.get("usage").cloned().unwrap_or(Value::Null);
let prompt = usage
Expand Down Expand Up @@ -496,6 +531,77 @@ fn convert_responses_body_to_anthropic_messages(
serde_json::to_vec(&payload).ok()
}

fn convert_anthropic_messages_body_to_responses(body: &[u8]) -> Option<Vec<u8>> {
let value = serde_json::from_slice::<Value>(body).ok()?;
let response_id = value
.get("id")
.and_then(Value::as_str)
.unwrap_or("resp_codexmanager");
let model = value.get("model").and_then(Value::as_str).unwrap_or("");
let mut output = Vec::new();
let mut output_text = String::new();
if let Some(content) = value.get("content").and_then(Value::as_array) {
for item in content {
let Some(obj) = item.as_object() else {
continue;
};
match obj.get("type").and_then(Value::as_str).unwrap_or_default() {
"text" => {
if let Some(text) = obj.get("text").and_then(Value::as_str) {
append_output_text(&mut output_text, text);
}
}
"tool_use" => {
output.push(json!({
"id": obj
.get("id")
.and_then(Value::as_str)
.unwrap_or("toolu_unknown"),
"type": "function_call",
"status": "completed",
"call_id": obj
.get("id")
.and_then(Value::as_str)
.unwrap_or("toolu_unknown"),
"name": obj
.get("name")
.and_then(Value::as_str)
.unwrap_or("tool"),
"arguments": obj
.get("input")
.cloned()
.unwrap_or_else(|| json!({}))
.to_string(),
}));
}
_ => {}
}
}
}
if !output_text.is_empty() {
output.insert(
0,
json!({
"id": format!("msg_{response_id}"),
"type": "message",
"status": "completed",
"role": "assistant",
"content": [{ "type": "output_text", "text": output_text }],
}),
);
}
let payload = json!({
"id": response_id,
"object": "response",
"created_at": 0,
"status": "completed",
"model": model,
"output": output,
"usage": responses_usage_from_anthropic(&value),
});
serde_json::to_vec(&payload).ok()
}

fn convert_responses_body_to_gemini_generate_content(
body: &[u8],
wrap_response_envelope: bool,
Expand Down Expand Up @@ -753,6 +859,9 @@ fn convert_success_body_for_adapter(
ResponseAdapter::AnthropicMessagesFromResponses => {
convert_responses_body_to_anthropic_messages(body, tool_name_restore_map)
}
ResponseAdapter::ResponsesFromAnthropicMessages => {
convert_anthropic_messages_body_to_responses(body)
}
ResponseAdapter::ChatCompletionsFromResponses => {
convert_responses_body_to_chat_completions(body)
}
Expand Down Expand Up @@ -1040,7 +1149,8 @@ fn convert_error_body_for_adapter(response_adapter: ResponseAdapter, message: &s
ResponseAdapter::AnthropicMessagesFromResponses => {
convert_upstream_error_to_anthropic_body(message)
}
ResponseAdapter::ChatCompletionsFromResponses => serde_json::to_vec(&json!({
ResponseAdapter::ResponsesFromAnthropicMessages
| ResponseAdapter::ChatCompletionsFromResponses => serde_json::to_vec(&json!({
"error": {
"message": message,
"type": "upstream_error",
Expand Down Expand Up @@ -1080,7 +1190,8 @@ fn compatibility_stream_content_type(
) -> &'static str {
match response_adapter {
ResponseAdapter::AnthropicMessagesFromResponses => "text/event-stream",
ResponseAdapter::ChatCompletionsFromResponses => "text/event-stream",
ResponseAdapter::ResponsesFromAnthropicMessages
| ResponseAdapter::ChatCompletionsFromResponses => "text/event-stream",
ResponseAdapter::CompactFromChatCompletions => "application/json",
ResponseAdapter::ImagesB64JsonFromResponses | ResponseAdapter::ImagesUrlFromResponses => {
"text/event-stream"
Expand Down Expand Up @@ -2174,6 +2285,46 @@ pub(crate) fn respond_with_upstream(
None,
));
}
ResponseAdapter::ResponsesFromAnthropicMessages => {
let usage_collector = Arc::new(Mutex::new(UpstreamResponseUsage::default()));
let response_body: Box<dyn std::io::Read + Send> =
Box::new(ResponsesFromAnthropicSseReader::new(
upstream,
Arc::clone(&usage_collector),
fallback_model,
request_started_at,
));
let delivery_error =
respond_streaming_chunked(request, status, headers, response_body)
.err()
.map(|err| err.to_string());
let usage = usage_collector
.lock()
.map(|guard| guard.clone())
.unwrap_or_default();
return Ok(with_bridge_debug_meta(
UpstreamResponseBridgeResult {
usage,
stream_terminal_seen: true,
stream_terminal_error: None,
delivery_error,
upstream_error_hint: None,
delivered_status_code: None,
upstream_request_id: None,
upstream_cf_ray: None,
upstream_auth_error: None,
upstream_identity_error_code: None,
upstream_content_type: None,
last_sse_event_type: None,
},
&upstream_request_id,
&upstream_cf_ray,
&upstream_auth_error,
&upstream_identity_error_code,
&upstream_content_type,
None,
));
}
ResponseAdapter::ChatCompletionsFromResponses => {
let usage_collector = Arc::new(Mutex::new(PassthroughSseCollector::default()));
let response_body: Box<dyn std::io::Read + Send> =
Expand Down Expand Up @@ -2900,6 +3051,7 @@ pub(crate) fn respond_with_upstream(
))
}
ResponseAdapter::AnthropicMessagesFromResponses
| ResponseAdapter::ResponsesFromAnthropicMessages
| ResponseAdapter::ChatCompletionsFromResponses
| ResponseAdapter::ImagesB64JsonFromResponses
| ResponseAdapter::ImagesUrlFromResponses
Expand Down Expand Up @@ -3151,6 +3303,49 @@ pub(crate) fn respond_with_stream_upstream(
None,
));
}
ResponseAdapter::ResponsesFromAnthropicMessages => {
let upstream_body = upstream
.read_all_bytes()
.map_err(|err| format!("read upstream body failed: {err}"))?;
let usage_collector = Arc::new(Mutex::new(UpstreamResponseUsage::default()));
let response_body: Box<dyn std::io::Read + Send> =
Box::new(ResponsesFromAnthropicSseReader::from_reader(
std::io::Cursor::new(upstream_body.to_vec()),
Arc::clone(&usage_collector),
fallback_model,
request_started_at,
));
let delivery_error =
respond_streaming_chunked(request, status, headers, response_body)
.err()
.map(|err| err.to_string());
let usage = usage_collector
.lock()
.map(|guard| guard.clone())
.unwrap_or_default();
return Ok(with_bridge_debug_meta(
UpstreamResponseBridgeResult {
usage,
stream_terminal_seen: true,
stream_terminal_error: None,
delivery_error,
upstream_error_hint: None,
delivered_status_code: None,
upstream_request_id: None,
upstream_cf_ray: None,
upstream_auth_error: None,
upstream_identity_error_code: None,
upstream_content_type: None,
last_sse_event_type: None,
},
&upstream_request_id,
&upstream_cf_ray,
&upstream_auth_error,
&upstream_identity_error_code,
&upstream_content_type,
None,
));
}
ResponseAdapter::ChatCompletionsFromResponses => {
let upstream_body = upstream
.read_all_bytes()
Expand Down Expand Up @@ -3846,6 +4041,7 @@ pub(crate) fn respond_with_stream_upstream(
))
}
ResponseAdapter::AnthropicMessagesFromResponses
| ResponseAdapter::ResponsesFromAnthropicMessages
| ResponseAdapter::ChatCompletionsFromResponses
| ResponseAdapter::ImagesB64JsonFromResponses
| ResponseAdapter::ImagesUrlFromResponses
Expand Down Expand Up @@ -3881,6 +4077,7 @@ fn resolve_stream_keepalive_frame(
}
}
ResponseAdapter::AnthropicMessagesFromResponses
| ResponseAdapter::ResponsesFromAnthropicMessages
| ResponseAdapter::ChatCompletionsFromResponses
| ResponseAdapter::CompactFromChatCompletions
| ResponseAdapter::ImagesB64JsonFromResponses
Expand All @@ -3901,8 +4098,9 @@ mod tests {
convert_responses_body_to_chat_completions,
convert_responses_body_to_gemini_generate_content, convert_responses_body_to_images,
force_openai_responses_stream_content_type, gemini_cli_wrap_response_envelope,
merge_usage_from_body_without_output_text, write_streaming_chunked_response, HTTPVersion,
Header, ImagesResponseFormat, ResponseAdapter, StatusCode,
merge_usage_from_body_without_output_text, response_adapter_uses_manual_chunked_streaming,
write_streaming_chunked_response, HTTPVersion, Header, ImagesResponseFormat,
ResponseAdapter, StatusCode,
};
use serde_json::json;
use std::io::{Read, Write};
Expand Down Expand Up @@ -3979,6 +4177,13 @@ mod tests {
assert!(writer.flushes >= 4);
}

#[test]
fn responses_from_anthropic_streaming_uses_manual_chunked_delivery() {
assert!(response_adapter_uses_manual_chunked_streaming(
ResponseAdapter::ResponsesFromAnthropicMessages
));
}

/// 函数 `compact_header_only_identity_error_is_normalized_and_classified`
///
/// 作者: gaohongshun
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ pub(super) fn respond_with_upstream(
pub(super) use stream_readers::{
ChatCompletionsFromResponsesSseReader, ImagesFromResponsesSseReader,
OpenAIResponsesPassthroughSseReader, PassthroughSseCollector, PassthroughSseUsageReader,
SseKeepAliveFrame,
ResponsesFromAnthropicSseReader, SseKeepAliveFrame,
};

pub(super) use stream_readers::{AnthropicSseReader, GeminiSseReader};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ mod images;
mod openai_responses;
#[path = "stream_readers/passthrough.rs"]
mod passthrough;
#[path = "stream_readers/responses_from_anthropic.rs"]
mod responses_from_anthropic;

pub(crate) use anthropic::AnthropicSseReader;
pub(crate) use chat_completions::ChatCompletionsFromResponsesSseReader;
Expand All @@ -43,6 +45,7 @@ pub(crate) use gemini::GeminiSseReader;
pub(crate) use images::ImagesFromResponsesSseReader;
pub(crate) use openai_responses::OpenAIResponsesPassthroughSseReader;
pub(crate) use passthrough::PassthroughSseUsageReader;
pub(crate) use responses_from_anthropic::ResponsesFromAnthropicSseReader;

/// 函数 `reload_from_env`
///
Expand Down
Loading