Skip to content

Commit 019d2a1

Browse files
v0.14.3 apiproxy: 58% compression on live Claude Code traffic
Four new compression sources beyond messages-text: - System prompt (top-level field, preserves cache_control hints) - tool_use input (LLM's tool call args in conversation history) - tool_result output (now broken out separately from messages_text) - Aggregate per-block tracking with breakdown logging GET /_stats returns live cumulative compression numbers as JSON. Measured on this very Claude Code session (model=claude-opus-4-7): single request: 1,717,439 -> 724,057 bytes (993 KB saved, 58% reduction) sys=26B msg=98KB tool_result=629KB tool_use_input=293KB cumulative: 3.4 MB -> 1.46 MB (2.35x compression)
1 parent 994159c commit 019d2a1

1 file changed

Lines changed: 187 additions & 56 deletions

File tree

omnimcode-apiproxy/src/main.rs

Lines changed: 187 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,26 @@ struct Args {
7676
preview_bytes: usize,
7777
}
7878

79+
#[derive(Default, Debug, Clone)]
80+
struct RewriteStats {
81+
requests: u64,
82+
bytes_in: u64,
83+
bytes_out: u64,
84+
blocks_rewritten: u64,
85+
bytes_saved_messages: u64,
86+
bytes_saved_tool_result: u64,
87+
bytes_saved_system: u64,
88+
bytes_saved_tool_use_input: u64,
89+
}
90+
7991
#[derive(Clone)]
8092
struct AppState {
8193
upstream: String,
8294
rewrite_threshold: usize,
8395
preview_bytes: usize,
8496
http: reqwest::Client,
8597
store: Arc<MemoryStore>,
98+
stats: Arc<std::sync::Mutex<RewriteStats>>,
8699
}
87100

88101
#[tokio::main]
@@ -112,10 +125,12 @@ async fn main() -> Result<()> {
112125
.timeout(std::time::Duration::from_secs(300))
113126
.build()?,
114127
store: Arc::new(MemoryStore::from_env()),
128+
stats: Arc::new(std::sync::Mutex::new(RewriteStats::default())),
115129
};
116130

117131
let app = Router::new()
118132
.route("/v1/messages", post(handle_messages))
133+
.route("/_stats", axum::routing::get(stats_endpoint))
119134
.fallback(any(passthrough))
120135
.with_state(state);
121136

@@ -151,18 +166,36 @@ async fn handle_messages(State(state): State<AppState>, req: Request) -> Respons
151166
// expand-tool-use interception loop (which requires parsing the full response)
152167
// and just pass the SSE chunks straight through.
153168
let rewritten = match rewrite_request_body(&body_bytes, &state) {
154-
Ok(b) => b,
169+
Ok((b, outcome)) => {
170+
if outcome.any() {
171+
info!("rewrote request: {} → {} bytes ({:+} bytes saved across {} blocks) | \
172+
sys={}B msg={}B tool_result={}B tool_use_input={}B",
173+
body_bytes.len(), b.len(), -((body_bytes.len() - b.len()) as i64),
174+
outcome.rewritten_count,
175+
outcome.bytes_system, outcome.bytes_messages_text,
176+
outcome.bytes_tool_result, outcome.bytes_tool_use_input);
177+
}
178+
// Update cumulative stats
179+
{
180+
let mut s = state.stats.lock().unwrap();
181+
s.requests += 1;
182+
s.bytes_in += body_bytes.len() as u64;
183+
s.bytes_out += b.len() as u64;
184+
s.blocks_rewritten += outcome.rewritten_count as u64;
185+
s.bytes_saved_messages += outcome.bytes_messages_text as u64;
186+
s.bytes_saved_tool_result += outcome.bytes_tool_result as u64;
187+
s.bytes_saved_system += outcome.bytes_system as u64;
188+
s.bytes_saved_tool_use_input += outcome.bytes_tool_use_input as u64;
189+
}
190+
b
191+
}
155192
Err(e) => {
156193
warn!("rewrite failed, passing original through: {}", e);
157194
body_bytes.clone()
158195
}
159196
};
160197

161-
let saved = body_bytes.len() as i64 - rewritten.len() as i64;
162-
if saved > 0 {
163-
info!("rewrote request: {} → {} bytes ({:+} bytes saved)",
164-
body_bytes.len(), rewritten.len(), -saved);
165-
}
198+
let _saved_unused = body_bytes.len() as i64 - rewritten.len() as i64;
166199

167200
if is_streaming {
168201
// SSE response: just pass through. The LLM can still emit the expand
@@ -378,101 +411,199 @@ fn is_streaming_request(body: &[u8]) -> bool {
378411
.unwrap_or(false)
379412
}
380413

381-
/// Walk `messages[].content[]` for text blocks above the threshold and
382-
/// replace each with a `<omc:ref/>` marker. Inject the expand tool into
383-
/// the request's `tools` array.
414+
/// Per-request rewrite outcome — what was compressed and by how much, broken
415+
/// down by source so the operator can see at a glance whether system prompts,
416+
/// historical tool_results, or LLM tool_use inputs are the dominant savings.
417+
#[derive(Default, Debug)]
418+
struct RewriteOutcome {
419+
rewritten_count: usize,
420+
bytes_messages_text: usize,
421+
bytes_tool_result: usize,
422+
bytes_system: usize,
423+
bytes_tool_use_input: usize,
424+
}
425+
426+
impl RewriteOutcome {
427+
fn total_saved(&self) -> usize {
428+
self.bytes_messages_text + self.bytes_tool_result
429+
+ self.bytes_system + self.bytes_tool_use_input
430+
}
431+
fn any(&self) -> bool { self.rewritten_count > 0 }
432+
}
433+
434+
/// v0.14.3 — live cumulative-stats endpoint. `curl http://localhost:8090/_stats`
435+
async fn stats_endpoint(State(state): State<AppState>) -> Response {
436+
let s = state.stats.lock().unwrap().clone();
437+
let ratio = if s.bytes_out > 0 {
438+
s.bytes_in as f64 / s.bytes_out as f64
439+
} else { 0.0 };
440+
let total_saved = s.bytes_saved_messages + s.bytes_saved_tool_result
441+
+ s.bytes_saved_system + s.bytes_saved_tool_use_input;
442+
let json = serde_json::to_string_pretty(&serde_json::json!({
443+
"requests_processed": s.requests,
444+
"bytes_in_total": s.bytes_in,
445+
"bytes_out_total": s.bytes_out,
446+
"bytes_saved_total": total_saved,
447+
"compression_ratio": ratio,
448+
"blocks_rewritten": s.blocks_rewritten,
449+
"bytes_saved_by_source": {
450+
"messages_text": s.bytes_saved_messages,
451+
"tool_result": s.bytes_saved_tool_result,
452+
"system_prompt": s.bytes_saved_system,
453+
"tool_use_input": s.bytes_saved_tool_use_input,
454+
}
455+
})).unwrap();
456+
(StatusCode::OK,
457+
[(axum::http::header::CONTENT_TYPE, HeaderValue::from_static("application/json"))],
458+
json).into_response()
459+
}
460+
461+
/// Walk the request body and rewrite every eligible large block.
462+
///
463+
/// What gets rewritten (each independently):
464+
/// - `messages[].content` — string form or array-of-blocks form, except
465+
/// the LAST user message (kept intact so the LLM sees the current ask)
466+
/// - `messages[].content[]` of type `tool_result` — the `content` field
467+
/// - `messages[].content[]` of type `tool_use` — the JSON-serialized
468+
/// `input` field when its serialized form exceeds threshold; this
469+
/// catches the LLM's own large tool arguments (e.g., Write file content)
470+
/// - `system` (top-level): if a string, rewrites it as a single block; if
471+
/// an array, walks each `{type: "text", text: ...}` element. Critically
472+
/// PRESERVES the `cache_control` field on each element so Anthropic's
473+
/// prompt-cache layer still works on the rewritten form.
384474
///
385475
/// Safety rule: the LAST user message is never rewritten — that's the
386-
/// user's current intent, and replacing it with a marker would force the
387-
/// LLM to spend a round-trip expanding it just to know what was asked.
388-
fn rewrite_request_body(body: &[u8], state: &AppState) -> Result<Bytes> {
476+
/// user's current intent.
477+
fn rewrite_request_body(body: &[u8], state: &AppState) -> Result<(Bytes, RewriteOutcome)> {
389478
let mut v: Value = serde_json::from_slice(body)?;
479+
let mut out = RewriteOutcome::default();
480+
481+
// ---- system prompt (top-level field) ----
482+
if let Some(system) = v.get_mut("system") {
483+
match system {
484+
Value::String(s) => {
485+
if s.len() >= state.rewrite_threshold {
486+
if let Ok(marker) = make_marker(s, state) {
487+
out.bytes_system += s.len();
488+
out.rewritten_count += 1;
489+
*system = Value::String(marker);
490+
}
491+
}
492+
}
493+
Value::Array(blocks) => {
494+
for block in blocks.iter_mut() {
495+
if block.get("type").and_then(Value::as_str) == Some("text") {
496+
let Some(text) = block.get("text").and_then(Value::as_str) else { continue };
497+
if text.len() < state.rewrite_threshold { continue; }
498+
let Ok(marker) = make_marker(text, state) else { continue };
499+
out.bytes_system += text.len();
500+
out.rewritten_count += 1;
501+
// Mutate ONLY the `text` field; preserve cache_control + everything else
502+
block["text"] = Value::String(marker);
503+
}
504+
}
505+
}
506+
_ => {}
507+
}
508+
}
509+
510+
// ---- messages array ----
390511
let Some(messages) = v.get_mut("messages").and_then(Value::as_array_mut) else {
391-
anyhow::bail!("no 'messages' array in request");
512+
// No messages? Just system rewriting may have happened — return what we have.
513+
let bytes = Bytes::from(serde_json::to_vec(&v)?);
514+
return Ok((bytes, out));
392515
};
393-
394-
// Find the index of the last user message — protect it from rewriting.
395516
let last_user_idx = messages.iter().enumerate().rev()
396517
.find(|(_, m)| m.get("role").and_then(Value::as_str) == Some("user"))
397518
.map(|(i, _)| i);
398519

399-
let mut rewritten_count = 0usize;
400-
let mut bytes_replaced = 0usize;
401-
402520
for (idx, msg) in messages.iter_mut().enumerate() {
403521
if Some(idx) == last_user_idx { continue; }
404-
let content = msg.get_mut("content");
522+
let Some(content) = msg.get_mut("content") else { continue };
405523
match content {
406-
Some(Value::String(s)) => {
524+
Value::String(s) => {
407525
if s.len() >= state.rewrite_threshold {
408526
if let Ok(marker) = make_marker(s, state) {
409-
bytes_replaced += s.len();
410-
*content.unwrap() = Value::String(marker);
411-
rewritten_count += 1;
527+
out.bytes_messages_text += s.len();
528+
out.rewritten_count += 1;
529+
*content = Value::String(marker);
412530
}
413531
}
414532
}
415-
Some(Value::Array(blocks)) => {
533+
Value::Array(blocks) => {
416534
for block in blocks.iter_mut() {
417-
if block.get("type").and_then(Value::as_str) == Some("text") {
418-
if let Some(text) = block.get("text").and_then(Value::as_str) {
419-
if text.len() >= state.rewrite_threshold {
420-
if let Ok(marker) = make_marker(text, state) {
421-
bytes_replaced += text.len();
422-
block["text"] = Value::String(marker);
423-
rewritten_count += 1;
424-
}
535+
let block_type = block.get("type").and_then(Value::as_str).unwrap_or("");
536+
match block_type {
537+
"text" => {
538+
let Some(text) = block.get("text").and_then(Value::as_str) else { continue };
539+
if text.len() < state.rewrite_threshold { continue; }
540+
let Ok(marker) = make_marker(text, state) else { continue };
541+
out.bytes_messages_text += text.len();
542+
out.rewritten_count += 1;
543+
block["text"] = Value::String(marker);
544+
}
545+
"tool_result" => {
546+
if let Some(inner) = block.get_mut("content") {
547+
rewrite_tool_result_content(inner, state, &mut out);
425548
}
426549
}
427-
}
428-
// tool_result blocks carry a `content` field which can be a
429-
// string or an array of {type, text}. Same rewrite rule.
430-
if block.get("type").and_then(Value::as_str) == Some("tool_result") {
431-
if let Some(inner) = block.get_mut("content") {
432-
rewrite_tool_result_content(inner, state,
433-
&mut rewritten_count, &mut bytes_replaced);
550+
"tool_use" => {
551+
// Compress big `input` JSON (e.g., Write/Edit
552+
// calls where the LLM emitted file content).
553+
if let Some(input) = block.get_mut("input") {
554+
let serialized = serde_json::to_string(input)
555+
.unwrap_or_default();
556+
if serialized.len() >= state.rewrite_threshold {
557+
if let Ok(marker) = make_marker(&serialized, state) {
558+
out.bytes_tool_use_input += serialized.len();
559+
out.rewritten_count += 1;
560+
// Wrap marker as an object so the JSON
561+
// remains structurally an object — many
562+
// LLM clients assume `input` is a dict.
563+
*input = serde_json::json!({
564+
"_omc_compressed_input_marker": marker
565+
});
566+
}
567+
}
568+
}
434569
}
570+
_ => {}
435571
}
436572
}
437573
}
438574
_ => {}
439575
}
440576
}
441577

442-
if rewritten_count > 0 {
578+
if out.any() {
443579
inject_expand_tool(&mut v);
444-
debug!("rewrote {} blocks, replaced {} bytes", rewritten_count, bytes_replaced);
445580
}
446-
447-
let out = serde_json::to_vec(&v)?;
448-
Ok(Bytes::from(out))
581+
let bytes = Bytes::from(serde_json::to_vec(&v)?);
582+
Ok((bytes, out))
449583
}
450584

451585
fn rewrite_tool_result_content(
452-
inner: &mut Value, state: &AppState, count: &mut usize, bytes: &mut usize,
586+
inner: &mut Value, state: &AppState, out: &mut RewriteOutcome,
453587
) {
454588
match inner {
455589
Value::String(s) => {
456590
if s.len() >= state.rewrite_threshold {
457591
if let Ok(marker) = make_marker(s, state) {
458-
*bytes += s.len();
459-
*count += 1;
592+
out.bytes_tool_result += s.len();
593+
out.rewritten_count += 1;
460594
*inner = Value::String(marker);
461595
}
462596
}
463597
}
464598
Value::Array(parts) => {
465599
for part in parts.iter_mut() {
466600
if part.get("type").and_then(Value::as_str) == Some("text") {
467-
if let Some(text) = part.get("text").and_then(Value::as_str) {
468-
if text.len() >= state.rewrite_threshold {
469-
if let Ok(marker) = make_marker(text, state) {
470-
*bytes += text.len();
471-
*count += 1;
472-
part["text"] = Value::String(marker);
473-
}
474-
}
475-
}
601+
let Some(text) = part.get("text").and_then(Value::as_str) else { continue };
602+
if text.len() < state.rewrite_threshold { continue; }
603+
let Ok(marker) = make_marker(text, state) else { continue };
604+
out.bytes_tool_result += text.len();
605+
out.rewritten_count += 1;
606+
part["text"] = Value::String(marker);
476607
}
477608
}
478609
}

0 commit comments

Comments
 (0)