fix: stabilize Anthropic-native streaming, timeout handling, and fallback cancellation#80
Conversation
The streaming path in handleStreaming was missing a branch for Go
provider models that use the Anthropic endpoint (minimax-m3,
qwen3.5-plus, qwen3.6-plus, qwen3.7-plus). These requests fell
through to the OpenAI transform path, which sent the body to
the Anthropic /v1/messages endpoint with OpenAI-formatted tools
({"type":"function","function":{...}}). The MiniMax upstream
rejects this with 400 'function name or parameters is empty (2013)'.
Mirrors the non-streaming path: send the raw Anthropic body to
https://opencode.ai/zen/go/v1/messages, swapping the model field
so the routed model (not the user-requested claude-* alias) reaches
upstream.
Also hardens replaceModelInRawBody: previous string-search
implementation only matched the compact form '"model":"<id>"'
and silently passed through any whitespace-padded form
'"model": "<id>"', which would cause the wrong model to reach
upstream. New JSON-based implementation handles both forms and
falls back to the original body on parse failure (missing model
key, invalid JSON, non-string model value).
Adds regression tests:
- TestHandleStreaming_GoAnthropicModel_SendsRawAnthropicBody
- TestHandleStreaming_GoAnthropicModel_FallsThroughOnError
- TestHandleMessages_StreamingMinimaxM3_UsesAnthropicEndpoint
- TestHandleNonStreaming_GoAnthropicModel_ReplacesModelInBody
- TestReplaceModelInRawBody_{JSONBased,HandlesWhitespace,
ReturnsOriginalWhenModelMissing,ReturnsOriginalOnInvalidJSON,
HandlesNestedObjects}
Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
OpenAI-compatible upstreams (kimi, glm, mimo, qwen) reject requests
whose tools[].function.parameters is an empty object or missing
required JSON Schema fields. The previous transformTools emitted
parameters: {} for any tool with an empty or absent input_schema,
and forwarded malformed schemas verbatim when JSON parsing failed.
New transformTools:
- Skips tools whose name is empty or whitespace-only
- For empty / null / '{}' input_schema, emits the canonical
{"type":"object","properties":{},"additionalProperties":false}
- For valid schemas missing 'type' or 'properties', adds them
defensively (matches what most upstreams expect)
- For invalid JSON schemas, falls back to the canonical default
rather than forwarding broken bytes
- Preserves additionalProperties when the caller explicitly set it
Adds regression tests:
- TestTransformTools_SkipsEmptyName
- TestTransformTools_SkipsWhitespaceOnlyName
- TestTransformTools_FillsEmptySchema
- TestTransformTools_FillsNullSchema
- TestTransformTools_FillsEmptyObjectSchema
- TestTransformTools_FillsMissingType
- TestTransformTools_FillsMissingProperties
- TestTransformTools_RecoversFromInvalidJSON
- TestTransformTools_PreservesValidSchema
- TestTransformTools_PreservesAdditionalPropertiesWhenSet
Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
When the assistant's first output is a direct tool call (no preceding text or reasoning), the previous code emitted content_block_start with index 1 because *contentIndex++ ran before blockIdx = *contentIndex. Anthropic's SSE spec requires content block indices to start at 0 and be sequential; an index-1 block without an index-0 block causes the SDK to reject the response with InvalidHTTPResponse. The fix in processSSELine: - When closing an open text/reasoning block before starting a tool_use, advance contentIndex past the closed block first - Then set blockIdx = *contentIndex (the next free index) - Then advance contentIndex again to reserve for the new block For the first-block case (no preceding text), the if-block is skipped, blockIdx is the current *contentIndex (0), and the new tool_use lands at index 0 as required. Existing tests TestProxyStream_SingleToolCall and TestProxyStream_ToolCallAndFinishReasonInSameChunk are updated to assert the new index-0 behavior. Adds a dedicated TestProxyStream_ToolUseFirstContentBlock that exercises the previously-broken path end-to-end. Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
Added - provider-level streaming timeout configuration for OpenCode Go and OpenCode Zen - request timeout helpers for per-model request and streaming attempts - fallback handler coverage for cancellation, deadline, and timeout behavior Fixed - streaming requests now derive attempt timeouts from the client request context - non-streaming fallback stops cleanly on parent cancellation instead of returning false 502 all models failed responses - long-running SSE responses are no longer capped by a shorter server write timeout or global http client timeout - Anthropic raw streaming pauses proxy keepalives and serializes writes to avoid SSE corruption and JSON parse EOF failures - hot reload timeout warnings now reflect immediate effect for Go and Zen timeout updates Changed - example config documents streaming_timeout_ms for both upstream providers - streaming and non-streaming handlers now distinguish client cancellation from upstream model failure in logs Refactored - shortened over-detailed comments in touched handlers, transformer, client, and config code - replaceModelInRawBody keeps using JSON-based replacement with a smaller surface explanation Tests - added timeout helper tests for Go and Zen providers - added fallback tests for canceled contexts, parent deadlines, circuit-breaker accounting, and per-model timeout fallback - added handler tests for configurable streaming timeout, cancellation-aware fallback, Anthropic raw passthrough, and concurrent writer behavior - kept full go test ./... green before commit
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 86bb0ff45e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if heartbeatPaused.Load() == 1 { | ||
| continue |
There was a problem hiding this comment.
Flush raw Anthropic streams while suppressing heartbeats
When the selected model uses the raw Anthropic path, handleAnthropicStreaming just io.Copys resp.Body into rw and never calls Flush; before this change the periodic heartbeat still flushed the underlying writer every 3s. With this new continue, those ticks do nothing while raw passthrough is active, so normal small SSE events can remain buffered by net/http until the buffer fills or the handler returns, making native Anthropic streaming appear hung and risking downstream idle timeouts. Suppress injected bytes, but still flush after copied chunks or with a flush-only ticker.
Useful? React with 👍 / 👎.
| // Start heartbeat | ||
| var finished int32 | ||
| heartbeatDone := make(chan struct{}) | ||
| heartbeatPaused := new(atomic.Int32) |
There was a problem hiding this comment.
🔥 The Roast: Pausing the heartbeat is the right instinct, but this flag only guards raw Anthropic io.Copy; any future raw passthrough can trip the same landmine.
🩹 The Fix: Wrap any raw stream copy with a helper that pauses heartbeat for the whole copy, instead of manually setting the flag around one endpoint.
📏 Severity: suggestion
Reply with @kilocode-bot fix it to have Kilo Code address this issue.
| defer w.mu.Unlock() | ||
| if !w.wroteHeader { | ||
| w.WriteHeader(http.StatusOK) | ||
| w.wroteHeader = true |
There was a problem hiding this comment.
🔥 The Roast: Setting wroteHeader under the mutex is a good start, but later reads still peek at it bare-handed. The heartbeat goroutine can update this flag while handleStreaming checks it, so the race detector gets a front-row seat.
🩹 The Fix: Add a locked helper like HasWrittenHeader() on responseWriter, use it for the later checks, and keep direct wroteHeader access out of concurrent paths.
📏 Severity: warning
Reply with @kilocode-bot fix it to have Kilo Code address this issue.
| schema = []byte(`{"type":"object","properties":{},"additionalProperties":false}`) | ||
| default: | ||
| var schemaObj map[string]interface{} | ||
| if err := json.Unmarshal(schema, &schemaObj); err != nil { |
There was a problem hiding this comment.
🔥 The Roast: This catches invalid JSON, but valid JSON with the wrong shape can still waltz through. A schema like {"type":"string"} or {"properties":[]} will unmarshal happily and get forwarded as tool parameters that OpenAI can reject.
🩹 The Fix: After unmarshalling, validate that type is "object" and properties is an object; otherwise replace with the safe empty-object schema instead of preserving the bad shape.
📏 Severity: suggestion
Reply with @kilocode-bot fix it to have Kilo Code address this issue.
Code Review Roast 🔥Verdict: Previous Issues Resolved | Recommendation: No new concerns in incremental diff Overview
Issue Details (click to expand)
🏆 Best part: Holy shit, this PR actually fixed the timeout loophole properly. I need to sit down. The 💀 Worst part: Nothing. For real. I've been looking for something to roast and the only viable target is the fact that the fix is obvious in retrospect, which means the original time-bomb was avoidable. But that's on the old code, not this one. 📊 Overall: Like finding out the guy who built a Death Star actually came back and installed a proper fire suppression system. Rare, welcome, and slightly suspicious that it took a near-miss to make it happen. Files Reviewed (6 files)
Reply with Previous Review Summaries (3 snapshots, latest commit d6b4cf9)Current summary above is authoritative. Previous snapshots are kept for context only. Previous review (commit d6b4cf9)Verdict: 2 Issues Found | Recommendation: Address before merge Overview
Issue Details (click to expand)
🏆 Best part: The cancellation handling and circuit-breaker accounting are genuinely solid; I hate admitting that because it makes the remaining timeout gap look even more avoidable. 💀 Worst part: A "per-model streaming timeout" that stops caring once headers arrive is the production incident version of a smoke alarm that only works before the fire starts. 📊 Overall: Strong stabilization PR with two sharp edges left: one timeout loophole and one nil-map panic waiting in the schema sanitizer. Fix these issues in Kilo Cloud: https://app.kilo.ai/cloud-agent-fork/review/c6dadec2-4873-4951-8107-e9c03c9c34f6 Files Reviewed (16 files)
Previous reviewVerdict: 0 Issues Found | Recommendation: Merge All previous issues have been addressed in this PR. Fixes Applied
🏆 Best part: The circuit breaker logic now correctly distinguishes between client-initiated cancellations (which don't poison the breaker) and real upstream failures. This is the kind of production hygiene that keeps incidents from snowballing. 💀 Worst part: None — the sharpest splinters have been sanded down. Even the SSE event index was rejiggered to start tool_use blocks at 0 as the spec demands. 📊 Overall: Like a master craftsman's final pass — the rough edges are gone and the whole thing gleams. The tests added here would make the previous reviewer blush with pride. Files Reviewed (8 files)
Previous review (commit 86bb0ff)Verdict: 3 Issues Found | Recommendation: Address before merge Overview
Issue Details (click to expand)
🏆 Best part: The cancellation handling is genuinely tidy: fallback stops on parent cancellation and avoids poisoning circuit breakers with client disconnects, which is the kind of production hygiene I wish I saw more often. 💀 Worst part: The response writer race is the sharpest splinter here; you built a mutex moat, then left 📊 Overall: Strong production fix with a few concurrency and validation potholes — like a sports car with excellent brakes and one suspicious tire. Fix these issues in Kilo Cloud: https://app.kilo.ai/cloud-agent-fork/review/0f34b0d6-52cb-4b57-9ae1-e392a3b697eb Files Reviewed (14 files)
Reviewed by step-3.7-flash-20260528 · 513,444 tokens |
…m flushing, heartbeat scoping Fixed - Data race on responseWriter.wroteHeader: add mutex lock in WriteHeader and a locked HasWrittenHeader() accessor for concurrent reads. - Tool schema shape validation: verify "type" is "object" and "properties" is an object before forwarding to upstream OpenAI-compatible endpoints. - Raw Anthropic SSE stream buffering: wrap io.Copy with a flushWriter that calls Flush after every write, preventing hung streams during heartbeat suppression. Refactored - Scoped heartbeat pause into handleAnthropicStreaming itself (Store(1) / defer Store(0)), removing duplicated Store calls from both call sites so future raw passthrough paths cannot forget to suppress heartbeats. Co-Authored-By: kilocode-review[bot] <kilocode-review[bot]@users.noreply.github.com> Co-Authored-By: ChatGPT-Codex-Connector[bot] <chatgpt-codex-connector[bot]@users.noreply.github.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…flows Claude Code review workflows can dispatch many subagents and tool calls while still using streaming requests. When enable_streaming_scenario_routing is disabled, these requests may be routed to the fast scenario, even if they are tool-heavy. This can cause unreliable behavior with fast models during parallel Agent/tool orchestration. Document that users should enable enable_streaming_scenario_routing for /review-code, /ultracode, multi-agent dispatch, large diffs, and long-context review sessions so streaming requests can route to complex or long_context models.
| } | ||
|
|
||
| streamBody, err := h.client.GetStreamingBody(ctx, model.ModelID, openaiReq, model) | ||
| streamBody, err := h.client.GetStreamingBody(attemptCtx, model.ModelID, openaiReq, model) |
There was a problem hiding this comment.
🔥 The Roast: StreamingTimeoutMs is doing a great impression of a timeout until the response headers arrive; after GetStreamingBody returns, ProxyStream is still reading on clientCtx, so a mid-stream stall can sit there forever like a guest who ignores the last-call bell.
🩹 The Fix: Bind the stream-body read to the per-model attempt context, not just request startup. Pass attemptCtx (or a context-bound reader/wrapper) into ProxyStream, and apply the same treatment to the raw Anthropic io.Copy plus Responses/Gemini streaming paths.
📏 Severity: warning
Reply with @kilocode-bot fix it to have Kilo Code address this issue.
…l schema
Two warnings from kilo-code-bot:
1. StreamingTimeoutMs only guards request startup; once GetStreamingBody
returns, the body read was tied to the request context (no timeout),
so a mid-stream stall could sit forever. Pass the per-model attempt
context into ProxyStream/ProxyResponsesStream/ProxyGeminiStream and
the raw Anthropic io.Copy, and wrap the upstream body with a tiny
ctxio.NewCtxReadCloser so the body Read also respects the deadline.
2. transformTools panicked on valid JSON that unmarshals to a nil map
(e.g. " null " with decorative whitespace). Treat that case the same
as a successful parse of "{}" — fall back to the default schema.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Thanks a lot for working on this @hungcuong9125 , let me know if you need help with these merge conflicts since I introduced most of them and I'd be happy to fix them. |
|
@hungcuong9125 |
Summary
This PR completes the streaming and fallback stabilization work for
oc-go-cc, focused on long-running Claude Code sessions routed through OpenCode Go / Zen, especially Anthropic-native models such asminimax-m3andqwen3.x.It consolidates the recent fixes from:
b573bc1— stream Go Anthropic-native models via raw/v1/messages201aefd— hardentransformToolsagainst empty or malformed tool schemas3a8fef7— emittool_usecontent blocks correctly at index086bb0ff— make timeouts context-driven, stop false fallback on cancellation, and prevent Anthropic raw stream corruptionProblem
We were seeing multiple production issues during real Claude Code usage:
502 all models failedresponses.JSON Parse error: Unexpected EOFclient disconnected during anthropic stream error="context canceled"Root Causes
1. Hardcoded streaming timeout
handleStreamingused a hardcoded timeout derived fromcontext.Background(), which:2. Fallback loop ignored request cancellation
ExecuteWithFallbackcontinued trying more models even after the parent request context was already canceled.That produced:
502 all models failedresponses on normal client cancellation3. Streaming still depended on shorter transport/server limits
Even after fixing handler-level timeout logic, long streams could still be interrupted by:
http.Client.Timeouthttp.Server.WriteTimeout4. Anthropic raw SSE corruption
The proxy heartbeat was writing
:keepalive\n\nto the samehttp.ResponseWriterwhile Anthropic raw streaming usedio.Copy(w, resp.Body).That allowed keepalive bytes to be injected between partial writes of an upstream SSE event, corrupting the event payload and causing Claude Code JSON parse failures.
What Changed
Added
streaming_timeout_mssupport for bothopencode_goandopencode_zenRequestTimeout(model)StreamingTimeout(model)Fixed
502 all models failedWriteTimeouthttp.Client.TimeoutChanged
timeout_msstreaming_timeout_mstimeout_msstreaming_timeout_msstreaming_timeout_msRefactored
/v1/messagesroutingtransformToolshardeningtool_useindex handling at content block0Tests
Added or updated coverage for:
Validation
Validated with:
go test ./...Expected behavior after this PR:
all models failedafter normal client cancellationInvestigation Config
{ "api_key": "${OC_GO_CC_API_KEY}", "host": "127.0.0.1", "port": 3456, "hot_reload": true, "enable_streaming_scenario_routing": false, "respect_requested_model": false, "models": { "background": { "provider": "opencode-go", "model_id": "deepseek-v4-flash", "temperature": 0.2, "max_tokens": 4096, "reasoning_effort": "high", "thinking": { "type": "enabled" } }, "fast": { "provider": "opencode-go", "model_id": "deepseek-v4-flash", "temperature": 0.2, "max_tokens": 4096, "reasoning_effort": "high", "thinking": { "type": "enabled" } }, "default": { "provider": "opencode-go", "model_id": "mimo-v2.5", "temperature": 0.3, "max_tokens": 8192, "reasoning_effort": "high", "thinking": { "type": "enabled" } }, "think": { "provider": "opencode-go", "model_id": "deepseek-v4-pro", "temperature": 0.1, "max_tokens": 8192, "reasoning_effort": "max", "thinking": { "type": "enabled" } }, "complex": { "provider": "opencode-go", "model_id": "deepseek-v4-pro", "temperature": 0.1, "max_tokens": 8192, "reasoning_effort": "max", "thinking": { "type": "enabled" } }, "long_context": { "provider": "opencode-go", "model_id": "minimax-m3", "temperature": 0.3, "max_tokens": 16384, "reasoning_effort": "high", "thinking": { "type": "enabled" }, "context_threshold": 80000 } }, "fallbacks": { "background": [ { "provider": "opencode-go", "model_id": "mimo-v2.5", "temperature": 0.3, "max_tokens": 4096, "reasoning_effort": "high", "thinking": { "type": "enabled" } } ], "fast": [ { "provider": "opencode-go", "model_id": "mimo-v2.5", "temperature": 0.3, "max_tokens": 4096, "reasoning_effort": "high", "thinking": { "type": "enabled" } } ], "default": [ { "provider": "opencode-go", "model_id": "deepseek-v4-flash", "temperature": 0.2, "max_tokens": 4096, "reasoning_effort": "high", "thinking": { "type": "enabled" } }, { "provider": "opencode-go", "model_id": "deepseek-v4-pro", "temperature": 0.1, "max_tokens": 8192, "reasoning_effort": "max", "thinking": { "type": "enabled" } } ], "think": [ { "provider": "opencode-go", "model_id": "mimo-v2.5-pro", "temperature": 0.3, "max_tokens": 8192, "reasoning_effort": "high", "thinking": { "type": "enabled" } }, { "provider": "opencode-go", "model_id": "minimax-m3", "temperature": 0.3, "max_tokens": 16384, "reasoning_effort": "high", "thinking": { "type": "enabled" } } ], "complex": [ { "provider": "opencode-go", "model_id": "mimo-v2.5-pro", "temperature": 0.3, "max_tokens": 8192, "reasoning_effort": "high", "thinking": { "type": "enabled" } }, { "provider": "opencode-go", "model_id": "minimax-m3", "temperature": 0.3, "max_tokens": 16384, "reasoning_effort": "high", "thinking": { "type": "enabled" } } ], "long_context": [ { "provider": "opencode-go", "model_id": "deepseek-v4-pro", "temperature": 0.1, "max_tokens": 8192, "reasoning_effort": "max", "thinking": { "type": "enabled" } }, { "provider": "opencode-go", "model_id": "mimo-v2.5-pro", "temperature": 0.3, "max_tokens": 8192, "reasoning_effort": "high", "thinking": { "type": "enabled" } } ] }, "opencode_go": { "base_url": "https://opencode.ai/zen/go/v1/chat/completions", "anthropic_base_url": "https://opencode.ai/zen/go/v1/messages", "timeout_ms": 300000 }, "opencode_zen": { "base_url": "https://opencode.ai/zen/v1/chat/completions", "anthropic_base_url": "https://opencode.ai/zen/v1/messages", "responses_base_url": "https://opencode.ai/zen/v1/responses", "gemini_base_url": "https://opencode.ai/zen/v1/models", "timeout_ms": 300000 }, "logging": { "level": "info", "requests": false } }Reviewer Notes
Please build and run a final smoke test before merging, especially for the long-context streaming path:
go test ./... make buildRecommended manual verification:
tools=29+long_context -> minimax-m3Unexpected EOFall models failed