From fe14fabb159ff6651ba9d2524b7a9927033903de Mon Sep 17 00:00:00 2001 From: Jakevin Date: Mon, 15 Jun 2026 16:04:08 +0800 Subject: [PATCH] feat(dashboard): default mode. When the requested model is not on the whitelist, this model will be used instead. Leave it blank to reject requests outside the whitelist. --- src/dashboard/api.js | 37 +- src/dashboard/index.html | 16 +- src/dashboard/model-access.js | 10 + src/handlers/chat.js | 1375 +++++++++++++++++---------------- 4 files changed, 759 insertions(+), 679 deletions(-) diff --git a/src/dashboard/api.js b/src/dashboard/api.js index da496b73..684fbb54 100644 --- a/src/dashboard/api.js +++ b/src/dashboard/api.js @@ -30,7 +30,7 @@ import { getLogs, subscribeToLogs, unsubscribeFromLogs } from './logger.js'; import { getProxyConfig, getProxyConfigMasked, setGlobalProxy, setAccountProxy, removeProxy, getEffectiveProxy } from './proxy-config.js'; import { MODELS, MODEL_TIER_ACCESS as _TIER_TABLE, getTierModels as _getTierModels } from '../models.js'; import { windsurfLogin, refreshFirebaseToken, reRegisterWithCodeium } from './windsurf-login.js'; -import { getModelAccessConfig, setModelAccessMode, setModelAccessList, addModelToList, removeModelFromList } from './model-access.js'; +import { getModelAccessConfig, setModelAccessMode, setModelAccessList, addModelToList, removeModelFromList, setDefaultModel } from './model-access.js'; import { checkMessageRateLimit } from '../windsurf-api.js'; import { getNativeBridgeConfigStatus } from '../cascade-native-bridge.js'; import { getNativeBridgeStats } from '../native-bridge-stats.js'; @@ -1251,6 +1251,7 @@ export async function handleDashboardApi(method, subpath, body, req, res) { if (subpath === '/model-access' && method === 'PUT') { if (body.mode) setModelAccessMode(body.mode); if (body.list) setModelAccessList(body.list); + if (body.defaultModel !== undefined) setDefaultModel(body.defaultModel); return json(res, 200, { success: true, config: getModelAccessConfig() }); } @@ -1556,7 +1557,7 @@ async function gitStatus() { try { await runGit(['fetch', '--quiet', 'origin']); remote = (await runGit(['rev-parse', `origin/${branch}`])).trim(); - } catch {} + } catch { } const localMsg = (await runGit(['log', '-1', '--pretty=format:%s'])).trim(); const behind = remote && remote !== commit; const remoteMsg = behind ? (await runGit(['log', '-1', '--pretty=format:%s', remote]).catch(() => '')).trim() : ''; @@ -1611,21 +1612,21 @@ async function testProxy({ host, port, username, password, type }) { // TLS handshake + GET to verify the tunnel works return new Promise((resolve, reject) => { - const tlsSock = tls.connect({ socket, servername: targetHost, rejectUnauthorized: false }, () => { - tlsSock.write(`GET / HTTP/1.1\r\nHost: ${targetHost}\r\nConnection: close\r\nUser-Agent: WindsurfAPI/ProxyTest\r\n\r\n`); - }); - const chunks = []; - tlsSock.on('data', c => chunks.push(c)); - tlsSock.on('end', () => { - const body = Buffer.concat(chunks).toString('utf-8'); - const match = body.match(/\r\n\r\n([^\r\n]+)/); - const ip = match ? match[1].trim() : ''; - tlsSock.destroy(); - if (!ip || !/^\d+\.\d+\.\d+\.\d+$/.test(ip)) { - return reject(new Error('ERR_TLS_TUNNEL_ERROR')); - } - resolve({ egressIp: ip, type }); - }); - tlsSock.on('error', (err) => reject(new Error(`ERR_TLS_FAILED:${err.message}`))); + const tlsSock = tls.connect({ socket, servername: targetHost, rejectUnauthorized: false }, () => { + tlsSock.write(`GET / HTTP/1.1\r\nHost: ${targetHost}\r\nConnection: close\r\nUser-Agent: WindsurfAPI/ProxyTest\r\n\r\n`); + }); + const chunks = []; + tlsSock.on('data', c => chunks.push(c)); + tlsSock.on('end', () => { + const body = Buffer.concat(chunks).toString('utf-8'); + const match = body.match(/\r\n\r\n([^\r\n]+)/); + const ip = match ? match[1].trim() : ''; + tlsSock.destroy(); + if (!ip || !/^\d+\.\d+\.\d+\.\d+$/.test(ip)) { + return reject(new Error('ERR_TLS_TUNNEL_ERROR')); + } + resolve({ egressIp: ip, type }); + }); + tlsSock.on('error', (err) => reject(new Error(`ERR_TLS_FAILED:${err.message}`))); }); } diff --git a/src/dashboard/index.html b/src/dashboard/index.html index efe6f795..654ddbd2 100644 --- a/src/dashboard/index.html +++ b/src/dashboard/index.html @@ -2013,6 +2013,11 @@

模型控制

+
+ + +
当请求的模型不在白名单中时,将使用此模型代替。留空则拒绝白名单外的请求。
+
@@ -4598,6 +4603,7 @@

控制台登录

this.allModels = modelData.models || []; this.modelAccessConfig = accessData.mode ? accessData : { mode: 'all', list: [] }; document.querySelector(`input[name="model-mode"][value="${this.modelAccessConfig.mode}"]`).checked = true; + document.getElementById('default-model-input').value = this.modelAccessConfig.defaultModel || ''; this.updateModelListUI(); }, @@ -4653,12 +4659,20 @@

控制台登录

}, async setModelMode(mode) { - await this.api('PUT', '/model-access', { mode }); + const defaultModel = document.getElementById('default-model-input').value.trim(); + await this.api('PUT', '/model-access', { mode, defaultModel }); this.modelAccessConfig.mode = mode; + this.modelAccessConfig.defaultModel = defaultModel; this.updateModelListUI(); this.toast(I18n.t('toast.modeUpdated')); }, + async handleDefaultModelInput() { + const defaultModel = document.getElementById('default-model-input').value.trim(); + await this.api('PUT', '/model-access', { defaultModel }); + this.modelAccessConfig.defaultModel = defaultModel; + }, + async toggleModelInList(modelId) { const idx = this.modelAccessConfig.list.indexOf(modelId); if (idx > -1) { diff --git a/src/dashboard/model-access.js b/src/dashboard/model-access.js index 0aac7380..a47a3c1f 100644 --- a/src/dashboard/model-access.js +++ b/src/dashboard/model-access.js @@ -14,6 +14,7 @@ const ACCESS_FILE = join(config.dataDir, 'model-access.json'); const _config = { mode: 'all', list: [], // model IDs in the list + defaultModel: '', // default model to use if the requested one is blocked or not specified; optional, for UI purposes }; // Load @@ -60,6 +61,15 @@ export function removeModelFromList(modelId) { save(); } +export function setDefaultModel(modelId) { + _config.defaultModel = modelId || ''; + save(); +} + +export function getDefaultModel() { + return _config.defaultModel || ''; +} + /** * Some models in the catalog are simply the reasoning-mode variant of a * base model (claude-opus-4.6 vs claude-opus-4.6-thinking). For diff --git a/src/handlers/chat.js b/src/handlers/chat.js index 64e431cd..7441f239 100644 --- a/src/handlers/chat.js +++ b/src/handlers/chat.js @@ -7,7 +7,7 @@ import { createHash, randomUUID } from 'crypto'; import { WindsurfClient, contentToString, isCascadeTransportError } from '../client.js'; import { getApiKey, acquireAccountByKey, releaseAccount, getAccountAvailability, reportError, reportSuccess, markRateLimited, reportInternalError, updateCapability, getAccountList, isAllRateLimited, isAllTemporarilyUnavailable, refundReservation, looksLikeBanSignal, reportBanSignal, clearBanSignals, isModelBlockedByDrought, getDroughtSummary } from '../auth.js'; import { isStickyEnabled, setStickyBinding } from '../account/sticky-session.js'; -import { resolveModel, getModelInfo, pickRateLimitFallback } from '../models.js'; +import { resolveModel, getModelInfo, pickRateLimitFallback, MODELS } from '../models.js'; import { getLsFor, ensureLs } from '../langserver.js'; import { config, log } from '../config.js'; import { safeAccountRef, safeKeyRef } from '../log-safety.js'; @@ -1650,7 +1650,7 @@ async function _handleChatCompletionsInner(body, context = {}) { const c = typeof m?.content === 'string' ? m.content : Array.isArray(m?.content) ? m.content.map(p => p?.type === 'text' ? p.text : `[${p?.type}]`).join('|') : ''; log.info(`Probe[${reqId}] msg[${mi}] role=${m?.role} ${requestLogSummary(c)}`); } - } catch {} + } catch { } // Reject pathologically empty user turns. Without this, an empty // `user.content` slips through and the model answers against the @@ -1697,7 +1697,7 @@ async function _handleChatCompletionsInner(body, context = {}) { return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0); }, 0); if (sysBytes >= 8000) { - log.warn(`Probe[${reqId}]: large system prompt ${Math.round(sysBytes/1024)}KB — heavy clients (OpenClaw / Cline / opencode) may hit upstream panel-state retries above ~30KB`); + log.warn(`Probe[${reqId}]: large system prompt ${Math.round(sysBytes / 1024)}KB — heavy clients (OpenClaw / Cline / opencode) may hit upstream panel-state retries above ~30KB`); } } @@ -1715,25 +1715,35 @@ async function _handleChatCompletionsInner(body, context = {}) { } else if (wantThinking && isOpus47ModelKey(modelKey) && getModelInfo(modelKey + '-thinking') && !isOpus47ThinkingAutoRouteEnabled()) { log.warn(`Chat[${reqId}]: Opus 4.7 thinking auto-route disabled; using base model ${modelKey}. Upstream LS rejects ${modelKey}-thinking as model not found. Set WINDSURFAPI_OPUS47_THINKING_UIDS=1 only after upstream registers it.`); } - const routingModelKey = effectiveModelKey; - const modelInfo = getModelInfo(effectiveModelKey) || getModelInfo(modelKey); + let routingModelKey = effectiveModelKey; + let modelInfo = getModelInfo(effectiveModelKey) || getModelInfo(modelKey); // Reject unknown models. Without this, chat.js used to fall through to // legacy rawGetChatMessage with modelEnum=0 and modelUid=null, which // upstream silently routed to a default model. Callers saw "I'm Claude 4.5" // when they asked for `claude-4.6` (issue #68), or got blank responses for // typos. Fail fast with the same shape OpenAI uses. + // However, if a default model is configured in model access control, + // use that instead of rejecting. if (!modelInfo) { - return { - status: 400, - body: { - error: { - message: `Unsupported model: ${reqModel || config.defaultModel}`, - type: 'invalid_request_error', - param: 'model', - code: 'model_not_found', + const { defaultModel } = isModelAllowed(routingModelKey); + if (defaultModel) { + log.info(`Chat[${reqId}]: model ${routingModelKey} not found in catalog, using default model ${defaultModel}`); + routingModelKey = defaultModel; + modelInfo = MODELS[routingModelKey]; + } + if (!modelInfo) { + return { + status: 400, + body: { + error: { + message: `Unsupported model: ${reqModel || config.defaultModel}`, + type: 'invalid_request_error', + param: 'model', + code: 'model_not_found', + }, }, - }, - }; + }; + } } // Return the user's original model name in response.model / response headers // so external test harnesses (e.g. hvoy.ai "model signature" check) see @@ -1750,6 +1760,15 @@ async function _handleChatCompletionsInner(body, context = {}) { return { status: 403, body: { error: { message: access.reason, type: 'model_blocked' } } }; } + // If default model is set and the requested model is not in allowlist, + // use the default model instead + if (access.defaultModel) { + log.info(`Chat[${reqId}]: model ${routingModelKey} not in allowlist, using default model ${access.defaultModel}`); + routingModelKey = access.defaultModel; + modelInfo = MODELS[routingModelKey]; + displayModel = routingModelKey; + } + if (isSpecialAgentModelInfo(modelInfo)) { log.info(`Chat[${reqId}]: routing ${routingModelKey} through special-agent backend`); return handleSpecialAgentChatCompletion(body, { @@ -1819,8 +1838,8 @@ async function _handleChatCompletionsInner(body, context = {}) { : []; const nativeAllowlist = nativeBridgeOn ? Array.from(new Set(toolPartition.mapped - .map(t => nativeAllowlistNameForTool(t?.function?.name)) - .filter(Boolean))) + .map(t => nativeAllowlistNameForTool(t?.function?.name)) + .filter(Boolean))) : []; // Tools we ship to the emulation toolPreamble: the unmapped subset when // bridge is on, or the full tools[] when bridge is off (legacy behaviour). @@ -1941,7 +1960,7 @@ async function _handleChatCompletionsInner(body, context = {}) { for (const m of (messages || [])) { const c = typeof m?.content === 'string' ? m.content : Array.isArray(m?.content) ? m.content.filter(p => p?.type === 'text').map(p => p.text || '').join('\n') - : ''; + : ''; const hit = c.match(/[^.\n]{0,40}(?:working directory|cwd||)[^.\n]{0,80}/i); if (hit) { probe = hit[0].replace(/\s+/g, ' ').slice(0, 160); break; } } @@ -2122,19 +2141,19 @@ async function _handleChatCompletionsInner(body, context = {}) { wantJson, callerKey, { - checkMessageRateLimit: checkMessageRateLimitFn, - waitForAccount: waitForAccountFn, - cachePolicy, - wantThinking, - fpOpts: buildReuseOpts({ tools: effectiveTools, toolChoice: tool_choice, toolPreamble, preambleTier, emulateTools, route: body.__route || 'chat' }), - tools: effectiveTools, - route: body.__route || 'chat', - nativeOpts, - context, - ensureLs: context.ensureLs, - getLsFor: context.getLsFor, - WindsurfClient: context.WindsurfClient, - }); + checkMessageRateLimit: checkMessageRateLimitFn, + waitForAccount: waitForAccountFn, + cachePolicy, + wantThinking, + fpOpts: buildReuseOpts({ tools: effectiveTools, toolChoice: tool_choice, toolPreamble, preambleTier, emulateTools, route: body.__route || 'chat' }), + tools: effectiveTools, + route: body.__route || 'chat', + nativeOpts, + context, + ensureLs: context.ensureLs, + getLsFor: context.getLsFor, + WindsurfClient: context.WindsurfClient, + }); } // ── Local response cache (exact body match) ───────────── @@ -2256,8 +2275,8 @@ async function _handleChatCompletionsInner(body, context = {}) { const reason = tempUnavail.allUnavailable ? `所有可用账号暂时不可用,请 ${Math.ceil(tempUnavail.retryAfterMs / 1000)} 秒后重试` : rateLimited.allLimited - ? `所有可用账号均已达速率限制,请 ${Math.ceil(rateLimited.retryAfterMs / 1000)} 秒后重试` - : `${Math.ceil(QUEUE_MAX_WAIT_MS / 1000)} 秒内没有账号变为可用 — 账号可能被速率限制或对当前模型无权限`; + ? `所有可用账号均已达速率限制,请 ${Math.ceil(rateLimited.retryAfterMs / 1000)} 秒后重试` + : `${Math.ceil(QUEUE_MAX_WAIT_MS / 1000)} 秒内没有账号变为可用 — 账号可能被速率限制或对当前模型无权限`; lastErr = { status: (tempUnavail.allUnavailable || rateLimited.allLimited) ? 429 : 503, body: { error: { message: `${displayModel} 账号队列超时: ${reason}`, type: (tempUnavail.allUnavailable || rateLimited.allLimited) ? 'rate_limit_exceeded' : 'pool_exhausted' } }, @@ -2284,179 +2303,179 @@ async function _handleChatCompletionsInner(body, context = {}) { log.info(`Chat[${reqId}]: native bridge account gate skipped ${safeAccountRef(acct)}`); continue; } - // Pre-flight rate limit check (experimental): ask server.codeium.com if - // this account still has message capacity before burning an LS round trip. - if (isExperimentalEnabled('preflightRateLimit')) { - try { - const px = getEffectiveProxy(acct.id) || null; - const rl = await checkMessageRateLimitFn(acct.apiKey, px); - if (!rl.hasCapacity) { - log.warn(`Preflight: ${safeAccountRef(acct)} has no capacity (remaining=${rl.messagesRemaining}), skipping`); - refundReservation(acct.apiKey, acct.reservationTimestamp); - if (Number.isFinite(rl.retryAfterMs) && rl.retryAfterMs > 0) { - markRateLimited(acct.apiKey, rl.retryAfterMs, routingModelKey); - } - if (!reuseEntryDead && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) { - const availability = getAccountAvailability(acct.apiKey, routingModelKey); - const retryAfterMs = strictReuseRetryMs(availability); - poolCheckin(fpBefore, checkedOutReuseEntry, callerKey, ttlHintFromCachePolicy(cachePolicy)); - log.info(`Chat[${reqId}]: strict reuse preserved cascade after preflight rate limit`); - return { - status: 429, - headers: { 'Retry-After': String(Math.ceil(retryAfterMs / 1000)) }, - body: { - error: { - message: strictReuseMessage(displayModel, retryAfterMs, availability.reason), - type: 'rate_limit_exceeded', - retry_after_ms: retryAfterMs, + // Pre-flight rate limit check (experimental): ask server.codeium.com if + // this account still has message capacity before burning an LS round trip. + if (isExperimentalEnabled('preflightRateLimit')) { + try { + const px = getEffectiveProxy(acct.id) || null; + const rl = await checkMessageRateLimitFn(acct.apiKey, px); + if (!rl.hasCapacity) { + log.warn(`Preflight: ${safeAccountRef(acct)} has no capacity (remaining=${rl.messagesRemaining}), skipping`); + refundReservation(acct.apiKey, acct.reservationTimestamp); + if (Number.isFinite(rl.retryAfterMs) && rl.retryAfterMs > 0) { + markRateLimited(acct.apiKey, rl.retryAfterMs, routingModelKey); + } + if (!reuseEntryDead && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) { + const availability = getAccountAvailability(acct.apiKey, routingModelKey); + const retryAfterMs = strictReuseRetryMs(availability); + poolCheckin(fpBefore, checkedOutReuseEntry, callerKey, ttlHintFromCachePolicy(cachePolicy)); + log.info(`Chat[${reqId}]: strict reuse preserved cascade after preflight rate limit`); + return { + status: 429, + headers: { 'Retry-After': String(Math.ceil(retryAfterMs / 1000)) }, + body: { + error: { + message: strictReuseMessage(displayModel, retryAfterMs, availability.reason), + type: 'rate_limit_exceeded', + retry_after_ms: retryAfterMs, + }, }, - }, - }; + }; + } + continue; } - continue; + } catch (e) { + log.debug(`Preflight check failed for ${safeAccountRef(acct)}: ${e.message}`); + // Fail open — proceed with the request } - } catch (e) { - log.debug(`Preflight check failed for ${safeAccountRef(acct)}: ${e.message}`); - // Fail open — proceed with the request } - } - try { await ensureLsFn(acct.proxy); } catch (e) { - lastErr = isLsPoolExhausted(e) ? lsPoolExhaustedResponse(e) : { status: e.status || 503, body: { error: { message: e.message || String(e), type: e.type || 'ls_unavailable' } } }; - break; - } - const ls = getLsForFn(acct.proxy); - if (!ls) { lastErr = { status: 503, body: { error: { message: 'No LS instance available', type: 'ls_unavailable' } } }; break; } - // Cascade pins cascade_id to a specific LS port too; if the LS it was - // born on has been replaced, the cascade_id is dead. - if (reuseEntry && reuseEntry.lsPort !== ls.port) { - log.info(`Chat[${reqId}]: reuse MISS — LS port changed`); - checkedOutReuseEntry = null; - reuseEntry = null; - } - const _msgChars = (messages || []).reduce((n, m) => { - const c = m?.content; - return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0); - }, 0); - log.info(`Chat[${reqId}]: model=${displayModel} flow=${useCascade ? 'cascade' : 'legacy'} attempt=${attempt + 1} ${safeAccountRef(acct)} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgChars}${reuseEntry ? ' reuse=1' : ''}${emulateTools ? ' tools=emu' : ''}`); - const client = new WindsurfClientClass(acct.apiKey, ls.port, ls.csrfToken); - const result = await nonStreamResponse( - client, chatId, created, displayModel, routingModelKey, messages, cascadeMessages, modelEnum, modelUid, - useCascade, acct.apiKey, ckey, - // v2.0.87 (#129) — aliasModelKey is set by the outer wrapper - // when this handler is the second pass of an auto-fallback - // retry; it carries the ORIGINAL model name the client asked - // for so the cascade pool entry gets indexed under both keys. - reuseEnabled ? { reuseEntry, lsPort: ls.port, apiKey: acct.apiKey, accountId: acct.id, callerKey, cachePolicy, fpOpts, aliasModelKey: context.__aliasModelKey || null } : null, - modelInfo?.provider || null, - emulateTools, toolPreamble, wantJson, cachePolicy, wantThinking, tools, body.__route || 'chat', - nativeOpts, - // v2.0.88 (audit H-3) — when this is the inner-pass of an auto- - // fallback retry, the outer wrapper threads the ORIGINAL request's - // ckey through context.__originalCkey. We pass it down so the - // success-path cacheSet writes under the original key as well — - // next identical original-model request hits cache instead of - // re-burning the rate-limit + fallback cycle. - reqId, - context.__originalCkey || null, - ); - if (result.status === 200) return result; - reuseEntry = null; // don't try to reuse on the retry - if (result.reuseEntryInvalid) reuseEntryDead = true; - // #101: same upstream-timeout invalidation as the stream path — - // see the matching catch block in streamResponse for the full - // rationale (cascade trajectory left half-broken, next reuse hits - // it and the model "loses" the prior conversation). - const _resultError = result.body?.error || {}; - const _resultMsg = String(_resultError.upstream_message || _resultError.message || ''); - if ( - result.status === 504 - || _resultError.type === 'upstream_deadline_exceeded' - || _resultError.code === 'windsurf_provider_deadline' - || isUpstreamDeadlineExceeded(_resultMsg) - ) { - reuseEntryDead = true; - } - lastErr = result; - const errType = result.body?.error?.type; - // v2.0.61 (#113): policy_blocked → don't rotate accounts, return - // immediately. The model refused the request, swapping accounts - // gives the same refusal but burns more quota. - if (errType === 'policy_blocked') { recordPolicyBlocked(); return result; } - // Rate limit: this account is done for this model, try the next one - if (errType === 'rate_limit_exceeded') { - recordRateLimited(); - // v2.0.91 — IP-level circuit breaker: when Windsurf upstream - // rate-limits several accounts for the same model in a tight - // window, it's usually IP-wide cooldown, not per-account. - // Burning through all 40+ accounts just marks them all dead - // for 30 min. Detect and short-circuit. - if (!context.__rateLimitEvents) context.__rateLimitEvents = []; - const RL_WINDOW_MS = 8_000; - const RL_BURST_THRESHOLD = 3; - context.__rateLimitEvents.push({ - time: Date.now(), - model: routingModelKey, - account: acct.id, - cooldownMs: rateLimitBurstCooldownMs({ - message: result.body?.error?.message || '', - retryAfterMs: result.body?.error?.retry_after_ms, - apiKey: acct.apiKey, - modelKey: routingModelKey, - }), - }); - // Prune old events - const cutoff = Date.now() - RL_WINDOW_MS; - while (context.__rateLimitEvents.length && context.__rateLimitEvents[0].time < cutoff) { - context.__rateLimitEvents.shift(); + try { await ensureLsFn(acct.proxy); } catch (e) { + lastErr = isLsPoolExhausted(e) ? lsPoolExhaustedResponse(e) : { status: e.status || 503, body: { error: { message: e.message || String(e), type: e.type || 'ls_unavailable' } } }; + break; } - const sameModelBurst = context.__rateLimitEvents.filter(e => e.model === routingModelKey); - if (sameModelBurst.length >= RL_BURST_THRESHOLD) { - const maxCooldown = Math.max(...sameModelBurst.map(e => e.cooldownMs || IP_RATE_LIMIT_BURST_FLOOR_MS)); - log.warn(`Chat[${reqId}]: IP-rate-limit burst detected — ${sameModelBurst.length} accounts rate-limited on ${displayModel} within ${RL_WINDOW_MS}ms. Short-circuiting.`); - return { - status: 429, - headers: { 'Retry-After': String(Math.ceil(maxCooldown / 1000)) }, - body: { - error: { - message: `All accounts temporarily rate-limited on ${displayModel}. Windsurf upstream is applying IP-level cooldown. Wait ${formatRetryAfter(maxCooldown)} before retrying, or switch to a different model.`, - type: 'rate_limit_exceeded', - retry_after_ms: maxCooldown, + const ls = getLsForFn(acct.proxy); + if (!ls) { lastErr = { status: 503, body: { error: { message: 'No LS instance available', type: 'ls_unavailable' } } }; break; } + // Cascade pins cascade_id to a specific LS port too; if the LS it was + // born on has been replaced, the cascade_id is dead. + if (reuseEntry && reuseEntry.lsPort !== ls.port) { + log.info(`Chat[${reqId}]: reuse MISS — LS port changed`); + checkedOutReuseEntry = null; + reuseEntry = null; + } + const _msgChars = (messages || []).reduce((n, m) => { + const c = m?.content; + return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0); + }, 0); + log.info(`Chat[${reqId}]: model=${displayModel} flow=${useCascade ? 'cascade' : 'legacy'} attempt=${attempt + 1} ${safeAccountRef(acct)} ls=${ls.port} turns=${(messages || []).length} chars=${_msgChars}${reuseEntry ? ' reuse=1' : ''}${emulateTools ? ' tools=emu' : ''}`); + const client = new WindsurfClientClass(acct.apiKey, ls.port, ls.csrfToken); + const result = await nonStreamResponse( + client, chatId, created, displayModel, routingModelKey, messages, cascadeMessages, modelEnum, modelUid, + useCascade, acct.apiKey, ckey, + // v2.0.87 (#129) — aliasModelKey is set by the outer wrapper + // when this handler is the second pass of an auto-fallback + // retry; it carries the ORIGINAL model name the client asked + // for so the cascade pool entry gets indexed under both keys. + reuseEnabled ? { reuseEntry, lsPort: ls.port, apiKey: acct.apiKey, accountId: acct.id, callerKey, cachePolicy, fpOpts, aliasModelKey: context.__aliasModelKey || null } : null, + modelInfo?.provider || null, + emulateTools, toolPreamble, wantJson, cachePolicy, wantThinking, tools, body.__route || 'chat', + nativeOpts, + // v2.0.88 (audit H-3) — when this is the inner-pass of an auto- + // fallback retry, the outer wrapper threads the ORIGINAL request's + // ckey through context.__originalCkey. We pass it down so the + // success-path cacheSet writes under the original key as well — + // next identical original-model request hits cache instead of + // re-burning the rate-limit + fallback cycle. + reqId, + context.__originalCkey || null, + ); + if (result.status === 200) return result; + reuseEntry = null; // don't try to reuse on the retry + if (result.reuseEntryInvalid) reuseEntryDead = true; + // #101: same upstream-timeout invalidation as the stream path — + // see the matching catch block in streamResponse for the full + // rationale (cascade trajectory left half-broken, next reuse hits + // it and the model "loses" the prior conversation). + const _resultError = result.body?.error || {}; + const _resultMsg = String(_resultError.upstream_message || _resultError.message || ''); + if ( + result.status === 504 + || _resultError.type === 'upstream_deadline_exceeded' + || _resultError.code === 'windsurf_provider_deadline' + || isUpstreamDeadlineExceeded(_resultMsg) + ) { + reuseEntryDead = true; + } + lastErr = result; + const errType = result.body?.error?.type; + // v2.0.61 (#113): policy_blocked → don't rotate accounts, return + // immediately. The model refused the request, swapping accounts + // gives the same refusal but burns more quota. + if (errType === 'policy_blocked') { recordPolicyBlocked(); return result; } + // Rate limit: this account is done for this model, try the next one + if (errType === 'rate_limit_exceeded') { + recordRateLimited(); + // v2.0.91 — IP-level circuit breaker: when Windsurf upstream + // rate-limits several accounts for the same model in a tight + // window, it's usually IP-wide cooldown, not per-account. + // Burning through all 40+ accounts just marks them all dead + // for 30 min. Detect and short-circuit. + if (!context.__rateLimitEvents) context.__rateLimitEvents = []; + const RL_WINDOW_MS = 8_000; + const RL_BURST_THRESHOLD = 3; + context.__rateLimitEvents.push({ + time: Date.now(), + model: routingModelKey, + account: acct.id, + cooldownMs: rateLimitBurstCooldownMs({ + message: result.body?.error?.message || '', + retryAfterMs: result.body?.error?.retry_after_ms, + apiKey: acct.apiKey, + modelKey: routingModelKey, + }), + }); + // Prune old events + const cutoff = Date.now() - RL_WINDOW_MS; + while (context.__rateLimitEvents.length && context.__rateLimitEvents[0].time < cutoff) { + context.__rateLimitEvents.shift(); + } + const sameModelBurst = context.__rateLimitEvents.filter(e => e.model === routingModelKey); + if (sameModelBurst.length >= RL_BURST_THRESHOLD) { + const maxCooldown = Math.max(...sameModelBurst.map(e => e.cooldownMs || IP_RATE_LIMIT_BURST_FLOOR_MS)); + log.warn(`Chat[${reqId}]: IP-rate-limit burst detected — ${sameModelBurst.length} accounts rate-limited on ${displayModel} within ${RL_WINDOW_MS}ms. Short-circuiting.`); + return { + status: 429, + headers: { 'Retry-After': String(Math.ceil(maxCooldown / 1000)) }, + body: { + error: { + message: `All accounts temporarily rate-limited on ${displayModel}. Windsurf upstream is applying IP-level cooldown. Wait ${formatRetryAfter(maxCooldown)} before retrying, or switch to a different model.`, + type: 'rate_limit_exceeded', + retry_after_ms: maxCooldown, + }, }, - }, - }; + }; + } + if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) { + log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) rate-limited on ${displayModel}, stickyNoFallback enabled — not trying other accounts`); + break; + } + log.warn(`Account ${safeAccountRef(acct)} rate-limited on ${displayModel}, trying next account`); + continue; } - if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) { - log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) rate-limited on ${displayModel}, stickyNoFallback enabled — not trying other accounts`); + // Cascade transient 错误通常是上游或本地 LS 短暂抖动,先退避再切账号,避免连续打爆同一热窗口。 + if (errType === 'upstream_deadline_exceeded') { break; } - log.warn(`Account ${safeAccountRef(acct)} rate-limited on ${displayModel}, trying next account`); - continue; - } - // Cascade transient 错误通常是上游或本地 LS 短暂抖动,先退避再切账号,避免连续打爆同一热窗口。 - if (errType === 'upstream_deadline_exceeded') { - break; - } - if (errType === 'upstream_internal_error' || errType === 'upstream_transient_error') { - if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) { - log.warn(`Chat[${reqId}]: ${safeAccountRef(acct)} (sticky-bound) upstream transient error, stickyNoFallback enabled — not trying other accounts`); - break; + if (errType === 'upstream_internal_error' || errType === 'upstream_transient_error') { + if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) { + log.warn(`Chat[${reqId}]: ${safeAccountRef(acct)} (sticky-bound) upstream transient error, stickyNoFallback enabled — not trying other accounts`); + break; + } + internalCount++; + const backoffMs = await internalErrorBackoff(internalCount - 1); + log.warn(`Chat[${reqId}]: ${safeAccountRef(acct)} upstream transient error, waited ${backoffMs}ms before next account`); + continue; } - internalCount++; - const backoffMs = await internalErrorBackoff(internalCount - 1); - log.warn(`Chat[${reqId}]: ${safeAccountRef(acct)} upstream transient error, waited ${backoffMs}ms before next account`); - continue; - } - // Model not available on this account (permission_denied, etc.) - if (errType === 'model_not_available') { - if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) { - log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) cannot serve ${displayModel}, stickyNoFallback enabled — not trying other accounts`); - break; + // Model not available on this account (permission_denied, etc.) + if (errType === 'model_not_available') { + if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) { + log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) cannot serve ${displayModel}, stickyNoFallback enabled — not trying other accounts`); + break; + } + log.warn(`Account ${safeAccountRef(acct)} cannot serve ${displayModel}, trying next account`); + continue; } - log.warn(`Account ${safeAccountRef(acct)} cannot serve ${displayModel}, trying next account`); - continue; - } - break; // other errors (502, transport) — don't retry + break; // other errors (502, transport) — don't retry } finally { // Pair every successful getApiKey/acquireAccountByKey with a release // so the in-flight-count based balancer in auth.js (issue #37) stays @@ -2728,12 +2747,12 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages, // WINDSURFAPI_NLU_RETRY=0 to disable globally. const nluRetryEnabled = process.env.WINDSURFAPI_NLU_RETRY !== '0' && (process.env.WINDSURFAPI_NLU_RETRY === '1' - || /zhipu|glm|moonshot|kimi/i.test(String(provider || '')) - || /^(?:glm|kimi)/i.test(String(modelKey || ''))); + || /zhipu|glm|moonshot|kimi/i.test(String(provider || '')) + || /^(?:glm|kimi)/i.test(String(modelKey || ''))); if (toolCalls.length === 0 - && nluRetryEnabled - && Array.isArray(tools) && tools.length > 0 - && narrativeSource) { + && nluRetryEnabled + && Array.isArray(tools) && tools.length > 0 + && narrativeSource) { const lastUser = latestRealUserText(messages) || ''; const intendedTool = detectToolIntentInNarrative(narrativeSource, tools, { lastUserText: lastUser }); if (intendedTool) { @@ -2746,12 +2765,14 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages, const correctionMessages = [ ...cascadeMessages, { role: 'assistant', content: narrativeSource.slice(0, 4000) }, - { role: 'user', content: - `Your previous response described intending to call \`${intendedTool}\` but didn't emit the tool-call protocol block. ` + - `Re-emit the call now using the EXACT protocol format defined at the top of this conversation. ` + - `Do NOT narrate. Do NOT describe. Just the protocol block. ` + - `Provide a concrete argument value (the literal command / file path / query) — never placeholders like "command" or "the file". ` + - `\n\n你刚才描述了想用 \`${intendedTool}\` 工具但没按协议格式 emit。请直接重新 emit 协议块,不要 narrate。给具体的 argument 字面值(如 ls / /etc/hostname / "echo hi"),不要写"命令" / "文件" 这种占位词。` }, + { + role: 'user', content: + `Your previous response described intending to call \`${intendedTool}\` but didn't emit the tool-call protocol block. ` + + `Re-emit the call now using the EXACT protocol format defined at the top of this conversation. ` + + `Do NOT narrate. Do NOT describe. Just the protocol block. ` + + `Provide a concrete argument value (the literal command / file path / query) — never placeholders like "command" or "the file". ` + + `\n\n你刚才描述了想用 \`${intendedTool}\` 工具但没按协议格式 emit。请直接重新 emit 协议块,不要 narrate。给具体的 argument 字面值(如 ls / /etc/hostname / "echo hi"),不要写"命令" / "文件" 这种占位词。` + }, ]; log.info(`Chat[non-stream]: NLU retry — first pass narrate-only, retrying with correction (tool=${intendedTool} markers=${markers.join(',') || 'none'})`); const retryChunks = await client.cascadeChat(correctionMessages, modelEnum, modelUid, { @@ -2930,9 +2951,9 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages, // v2.0.91 — kimi-k2 upstream outage. Cascade returns idle_empty // (null content, ~1-6 tokens). Return clear error with alternatives. if (/^kimi/i.test(String(modelKey || '')) - && !toolCalls.length - && (!allText || allText.trim().length === 0) - && (!allThinking || allThinking.trim().length === 0)) { + && !toolCalls.length + && (!allText || allText.trim().length === 0) + && (!allThinking || allThinking.trim().length === 0)) { return { status: 502, body: { @@ -2979,7 +3000,7 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages, const usage = buildUsageBody(serverUsage, messages, allText, allThinking, cachePolicy); // v2.0.69 (#118): feed bucket totals into stats so dashboard can show // fresh_input vs cache_read vs cache_write breakdown. - try { recordTokenUsage(usage); } catch {} + try { recordTokenUsage(usage); } catch { } const finishReason = toolCalls.length ? 'tool_calls' : 'stop'; return { status: 200, @@ -3058,10 +3079,12 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages, if (chars > 500_000) { return { status: 413, - body: { error: { - message: `请求过大(${Math.round(chars / 1024)}KB 输入)导致语言服务器中断。请尝试:1) 分块发送;2) 先用摘要/summarization 预处理 PDF;3) 减少历史轮数`, - type: 'payload_too_large', - } }, + body: { + error: { + message: `请求过大(${Math.round(chars / 1024)}KB 输入)导致语言服务器中断。请尝试:1) 分块发送;2) 先用摘要/summarization 预处理 PDF;3) 减少历史轮数`, + type: 'payload_too_large', + } + }, }; } } @@ -3082,12 +3105,14 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages, return { status: isTransient ? 502 : (err.isModelError ? 403 : 502), reuseEntryInvalid: !!err.reuseEntryInvalid, - body: { error: { - message: isTransient - ? upstreamTransientErrorMessage(model, 1, isTransport ? 'cascade_transport' : 'internal_error') - : sanitizeText(err.message), - type: isTransient ? 'upstream_transient_error' : (err.isModelError ? 'model_not_available' : 'upstream_error'), - } }, + body: { + error: { + message: isTransient + ? upstreamTransientErrorMessage(model, 1, isTransport ? 'cascade_transport' : 'internal_error') + : sanitizeText(err.message), + type: isTransient ? 'upstream_transient_error' : (err.isModelError ? 'model_not_available' : 'upstream_error'), + } + }, }; } } @@ -3132,7 +3157,7 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad }, async handler(res) { const abortController = new AbortController(); - let unregisterSse = () => {}; + let unregisterSse = () => { }; res.on('close', () => { if (!res.writableEnded) { log.info('Client disconnected mid-stream, aborting upstream'); @@ -3169,20 +3194,30 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad log.info(`Chat: cache HIT model=${model} flow=stream`); recordRequest(model, true, 0, null); try { - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] + }); if (cached.thinking) { - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: { reasoning_content: cached.thinking }, finish_reason: null }] }); + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: { reasoning_content: cached.thinking }, finish_reason: null }] + }); } if (cached.text) { - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: { content: cached.text }, finish_reason: null }] }); + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: { content: cached.text }, finish_reason: null }] + }); } - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: {}, finish_reason: 'stop' }] }); - send({ id, object: 'chat.completion.chunk', created, model, - choices: [], usage: cachedUsage(messages, cached.text) }); + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: {}, finish_reason: 'stop' }] + }); + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [], usage: cachedUsage(messages, cached.text) + }); if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); } } finally { unregisterSse(); @@ -3203,11 +3238,11 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad // every attempt hit it. let streamInternalCount = 0; // Dynamic: try every active account in the pool (capped at 10) so a - // large pool with many rate-limited accounts can still fall through - // to a free one. Was hardcoded 3 — in pools bigger than 3 with the - // first accounts rate-limited, healthy accounts were never reached - // even though they would have worked (issue #5). - const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length)); + // large pool with many rate-limited accounts can still fall through + // to a free one. Was hardcoded 3 — in pools bigger than 3 with the + // first accounts rate-limited, healthy accounts were never reached + // even though they would have worked (issue #5). + const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length)); // Accumulate chunks so we can cache a successful response at the end. let accText = ''; @@ -3296,28 +3331,36 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad // lookahead). On finish we'll emit one clean JSON payload. if (wantJson) return; emittedClientPayload = true; - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: { content: clean }, finish_reason: null }] }); + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: { content: clean }, finish_reason: null }] + }); }; const emitThinking = (clean) => { if (!clean) return; accThinking += clean; emittedClientPayload = true; - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: { reasoning_content: clean }, finish_reason: null }] }); + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: { reasoning_content: clean }, finish_reason: null }] + }); }; const emitToolCallDelta = (tc, idx) => { emittedClientPayload = true; - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: { - tool_calls: [{ - index: idx, - id: tc.id, - type: 'function', - function: { name: tc.name, arguments: sanitizeText(tc.argumentsJson || '{}') }, - }], - }, finish_reason: null }] }); + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ + index: 0, delta: { + tool_calls: [{ + index: idx, + id: tc.id, + type: 'function', + function: { name: tc.name, arguments: sanitizeText(tc.argumentsJson || '{}') }, + }], + }, finish_reason: null + }] + }); }; const emitNativeFallbackCalls = (rawCalls) => { @@ -3337,8 +3380,10 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad const onChunk = (chunk) => { if (!rolePrinted) { rolePrinted = true; - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] + }); } hadSuccess = true; @@ -3353,7 +3398,7 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad recordNativeBridgeCascadeToolCall(raw.name); } if (isCompletedReadUrlNativeResult(raw) - && !completedNativeReadUrlResults.some(existing => (existing.id || '') === (raw.id || ''))) { + && !completedNativeReadUrlResults.some(existing => (existing.id || '') === (raw.id || ''))) { completedNativeReadUrlResults.push(raw); } return; @@ -3522,8 +3567,8 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad const reason = tempUnavail.allUnavailable ? `所有可用账号暂时不可用,请 ${Math.ceil(tempUnavail.retryAfterMs / 1000)} 秒后重试` : rateLimited.allLimited - ? `所有可用账号均已达速率限制,请 ${Math.ceil(rateLimited.retryAfterMs / 1000)} 秒后重试` - : `${Math.ceil(QUEUE_MAX_WAIT_MS / 1000)} 秒内没有账号变为可用 — 账号可能被速率限制或对当前模型无权限`; + ? `所有可用账号均已达速率限制,请 ${Math.ceil(rateLimited.retryAfterMs / 1000)} 秒后重试` + : `${Math.ceil(QUEUE_MAX_WAIT_MS / 1000)} 秒内没有账号变为可用 — 账号可能被速率限制或对当前模型无权限`; lastErr = Object.assign( new Error(`${model} 账号队列超时: ${reason}`), { type: (tempUnavail.allUnavailable || rateLimited.allLimited) ? 'rate_limit_exceeded' : 'pool_exhausted' } @@ -3551,428 +3596,438 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad log.info(`Chat[${reqId}]: native bridge account gate skipped ${safeAccountRef(acct)}`); continue; } - // Pre-flight rate limit check (experimental) - if (isExperimentalEnabled('preflightRateLimit')) { - try { - const px = getEffectiveProxy(acct.id) || null; - const rl = await checkMessageRateLimitFn(acct.apiKey, px); - if (!rl.hasCapacity) { - log.warn(`Preflight: ${safeAccountRef(acct)} has no capacity (remaining=${rl.messagesRemaining}), skipping`); - refundReservation(acct.apiKey, acct.reservationTimestamp); - if (Number.isFinite(rl.retryAfterMs) && rl.retryAfterMs > 0) { - markRateLimited(acct.apiKey, rl.retryAfterMs, modelKey); - } - if (!reuseEntryDead && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) { - const availability = getAccountAvailability(acct.apiKey, modelKey); - const retryAfterMs = strictReuseRetryMs(availability); - lastErr = Object.assign( - new Error(strictReuseMessage(model, retryAfterMs, availability.reason)), - { type: 'rate_limit_exceeded' } - ); - log.info(`Chat[${reqId}]: strict reuse preserved cascade after preflight rate limit`); - break; + // Pre-flight rate limit check (experimental) + if (isExperimentalEnabled('preflightRateLimit')) { + try { + const px = getEffectiveProxy(acct.id) || null; + const rl = await checkMessageRateLimitFn(acct.apiKey, px); + if (!rl.hasCapacity) { + log.warn(`Preflight: ${safeAccountRef(acct)} has no capacity (remaining=${rl.messagesRemaining}), skipping`); + refundReservation(acct.apiKey, acct.reservationTimestamp); + if (Number.isFinite(rl.retryAfterMs) && rl.retryAfterMs > 0) { + markRateLimited(acct.apiKey, rl.retryAfterMs, modelKey); + } + if (!reuseEntryDead && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) { + const availability = getAccountAvailability(acct.apiKey, modelKey); + const retryAfterMs = strictReuseRetryMs(availability); + lastErr = Object.assign( + new Error(strictReuseMessage(model, retryAfterMs, availability.reason)), + { type: 'rate_limit_exceeded' } + ); + log.info(`Chat[${reqId}]: strict reuse preserved cascade after preflight rate limit`); + break; + } + continue; } - continue; + } catch (e) { + log.debug(`Preflight check failed for ${safeAccountRef(acct)}: ${e.message}`); } - } catch (e) { - log.debug(`Preflight check failed for ${safeAccountRef(acct)}: ${e.message}`); } - } - try { await ensureLsFn(acct.proxy); } catch (e) { - lastErr = isLsPoolExhausted(e) - ? Object.assign(new Error(e.message), { type: 'ls_pool_exhausted', status: e.status || 503 }) - : e; - break; - } - const ls = getLsForFn(acct.proxy); - if (!ls) { lastErr = new Error('No LS instance available'); break; } - if (reuseEntry && reuseEntry.lsPort !== ls.port) { - log.info(`Chat[${reqId}]: reuse MISS — LS port changed`); - checkedOutReuseEntry = null; - reuseEntry = null; - } - const _msgCharsStream = (messages || []).reduce((n, m) => { - const c = m?.content; - return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0); - }, 0); - log.info(`Chat: model=${model} flow=${useCascade ? 'cascade' : 'legacy'} stream=true attempt=${attempt + 1} ${safeAccountRef(acct)} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgCharsStream}${reuseEntry ? ' reuse=1' : ''}`); - const client = new ClientClass(acct.apiKey, ls.port, ls.csrfToken); - let cascadeResult = null; - try { - if (useCascade) { - cascadeResult = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, { - onChunk, signal: abortController.signal, reuseEntry, - toolPreamble: nativeBridgeOn ? '' : toolPreamble, - nativeEnvironment: nativeBridgeOn ? (nativeOpts?.environment || '') : '', - displayModel: model, - nativeMode: nativeBridgeOn, - nativeAllowlist: nativeOpts?.allowlist || null, - additionalSteps: nativeOpts?.additionalSteps || null, - }); - } else { - await client.rawGetChatMessage(messages, modelEnum, modelUid, { onChunk }); + try { await ensureLsFn(acct.proxy); } catch (e) { + lastErr = isLsPoolExhausted(e) + ? Object.assign(new Error(e.message), { type: 'ls_pool_exhausted', status: e.status || 503 }) + : e; + break; } - // Flush order matters: - // 1. NativeFunctionCallStreamParser tail → may produce provider - // native tool_calls withheld from content deltas - // 2. ToolCallStreamParser tail → may produce more text deltas - // (e.g., a dangling that never closed falls - // through as literal text) - // 3. PathSanitizeStream tail (text) → scrubs anything the tool - // parser held back AND anything we were holding ourselves - // 4. PathSanitizeStream tail (thinking) - if (nativeFunctionParser) { - const nativeTail = nativeFunctionParser.flush(); - const emitted = emitNativeFallbackCalls(nativeTail.toolCalls); - if (emitted) { - log.info(`Chat[stream]: native bridge parsed ${emitted} provider-native function_call(s) from stream tail`); - } - if (nativeTail.text) emitContent(pathStreamText.feed(nativeTail.text)); + const ls = getLsForFn(acct.proxy); + if (!ls) { lastErr = new Error('No LS instance available'); break; } + if (reuseEntry && reuseEntry.lsPort !== ls.port) { + log.info(`Chat[${reqId}]: reuse MISS — LS port changed`); + checkedOutReuseEntry = null; + reuseEntry = null; } - if (toolParser) { - const tail = toolParser.flush(); - if (tail.text) emitContent(pathStreamText.feed(tail.text)); - // M2 allowlist on the tail flush as well — stream end can - // still emit tail tool_calls and they need the same filter. - const filteredTail = emulateTools - ? filterToolCallsByAllowlist(tail.toolCalls, declaredTools) - : []; - for (const rawTc of filteredTail) { - const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages)); - const idx = collectedToolCalls.length; - collectedToolCalls.push(tc); - bridgeDiag.emulatedToolCalls++; - bridgeDiag.emulatedNames.push(tc.name); - emitToolCallDelta(tc, idx); + const _msgCharsStream = (messages || []).reduce((n, m) => { + const c = m?.content; + return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0); + }, 0); + log.info(`Chat: model=${model} flow=${useCascade ? 'cascade' : 'legacy'} stream=true attempt=${attempt + 1} ${safeAccountRef(acct)} ls=${ls.port} turns=${(messages || []).length} chars=${_msgCharsStream}${reuseEntry ? ' reuse=1' : ''}`); + const client = new ClientClass(acct.apiKey, ls.port, ls.csrfToken); + let cascadeResult = null; + try { + if (useCascade) { + cascadeResult = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, { + onChunk, signal: abortController.signal, reuseEntry, + toolPreamble: nativeBridgeOn ? '' : toolPreamble, + nativeEnvironment: nativeBridgeOn ? (nativeOpts?.environment || '') : '', + displayModel: model, + nativeMode: nativeBridgeOn, + nativeAllowlist: nativeOpts?.allowlist || null, + additionalSteps: nativeOpts?.additionalSteps || null, + }); + } else { + await client.rawGetChatMessage(messages, modelEnum, modelUid, { onChunk }); + } + // Flush order matters: + // 1. NativeFunctionCallStreamParser tail → may produce provider + // native tool_calls withheld from content deltas + // 2. ToolCallStreamParser tail → may produce more text deltas + // (e.g., a dangling that never closed falls + // through as literal text) + // 3. PathSanitizeStream tail (text) → scrubs anything the tool + // parser held back AND anything we were holding ourselves + // 4. PathSanitizeStream tail (thinking) + if (nativeFunctionParser) { + const nativeTail = nativeFunctionParser.flush(); + const emitted = emitNativeFallbackCalls(nativeTail.toolCalls); + if (emitted) { + log.info(`Chat[stream]: native bridge parsed ${emitted} provider-native function_call(s) from stream tail`); + } + if (nativeTail.text) emitContent(pathStreamText.feed(nativeTail.text)); } - // Diagnostic: same as nonStreamResponse but for the SSE path — - // surface why no tool_calls came out when emulation was active. - // See nonStreamResponse for marker rationale (#109 sub2api E2E). - // v2.0.72 fix: see non-stream comment — combine accText + - // accThinking for marker / NLU detection so models that - // route narrate output through reasoning_content (GLM-4.7, - // some Claude models in thinking mode) don't slip past. - const accNarrative = (accText && accText.trim()) ? accText : accThinking; - if (emulateTools && collectedToolCalls.length === 0 && accNarrative) { - const head = accNarrative.slice(0, 240).replace(/\s+/g, ' '); - const markers = []; - if (/ markup, extract + emit as - // tool_call delta so client agent loop doesn't break. - // - // v2.0.76 (#120 follow-up): widened to fire even when - // markers were detected but parser produced 0 calls - // (mirrors the non-stream path). - if (declaredTools.length > 0) { - const lastUser = latestRealUserText(messages) || ''; - const recovered = extractIntentFromNarrative(accNarrative, declaredTools, { lastUserText: lastUser, markers }); - if (recovered.length) { - const recoveredCalls = recovered.map((r, i) => ({ - id: `nlu_${i}_${Date.now().toString(36)}`, - name: r.name, - argumentsJson: r.argumentsJson, - })); - const filtered = filterToolCallsByAllowlist(recoveredCalls, declaredTools); - for (const rawTc of filtered) { - const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages)); - const idx = collectedToolCalls.length; - collectedToolCalls.push(tc); - bridgeDiag.emulatedToolCalls++; - bridgeDiag.emulatedNames.push(tc.name); - emitToolCallDelta(tc, idx); + if (toolParser) { + const tail = toolParser.flush(); + if (tail.text) emitContent(pathStreamText.feed(tail.text)); + // M2 allowlist on the tail flush as well — stream end can + // still emit tail tool_calls and they need the same filter. + const filteredTail = emulateTools + ? filterToolCallsByAllowlist(tail.toolCalls, declaredTools) + : []; + for (const rawTc of filteredTail) { + const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages)); + const idx = collectedToolCalls.length; + collectedToolCalls.push(tc); + bridgeDiag.emulatedToolCalls++; + bridgeDiag.emulatedNames.push(tc.name); + emitToolCallDelta(tc, idx); + } + // Diagnostic: same as nonStreamResponse but for the SSE path — + // surface why no tool_calls came out when emulation was active. + // See nonStreamResponse for marker rationale (#109 sub2api E2E). + // v2.0.72 fix: see non-stream comment — combine accText + + // accThinking for marker / NLU detection so models that + // route narrate output through reasoning_content (GLM-4.7, + // some Claude models in thinking mode) don't slip past. + const accNarrative = (accText && accText.trim()) ? accText : accThinking; + if (emulateTools && collectedToolCalls.length === 0 && accNarrative) { + const head = accNarrative.slice(0, 240).replace(/\s+/g, ' '); + const markers = []; + if (/ markup, extract + emit as + // tool_call delta so client agent loop doesn't break. + // + // v2.0.76 (#120 follow-up): widened to fire even when + // markers were detected but parser produced 0 calls + // (mirrors the non-stream path). + if (declaredTools.length > 0) { + const lastUser = latestRealUserText(messages) || ''; + const recovered = extractIntentFromNarrative(accNarrative, declaredTools, { lastUserText: lastUser, markers }); + if (recovered.length) { + const recoveredCalls = recovered.map((r, i) => ({ + id: `nlu_${i}_${Date.now().toString(36)}`, + name: r.name, + argumentsJson: r.argumentsJson, + })); + const filtered = filterToolCallsByAllowlist(recoveredCalls, declaredTools); + for (const rawTc of filtered) { + const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages)); + const idx = collectedToolCalls.length; + collectedToolCalls.push(tc); + bridgeDiag.emulatedToolCalls++; + bridgeDiag.emulatedNames.push(tc.name); + emitToolCallDelta(tc, idx); + } + if (filtered.length) { + log.info(`Chat[stream]: NLU recovery — promoted ${filtered.length} narrative tool_call(s) mid-stream (markers=${markers.join(',') || 'none'})`); + } } - if (filtered.length) { - log.info(`Chat[stream]: NLU recovery — promoted ${filtered.length} narrative tool_call(s) mid-stream (markers=${markers.join(',') || 'none'})`); + } + // v2.0.71 (#115) — fabricate detection on stream tail + // (only if NLU didn't recover anything). + if (markers.length === 0 && collectedToolCalls.length === 0) { + const lastUser = latestRealUserText(messages) || ''; + const fab = detectFabricatedToolResult(accNarrative, { lastUserText: lastUser }); + if (fab) { + log.warn(`Chat[stream]: fabricate detected — model=${modelKey} pattern=${fab.matchedPattern} sample="${fab.sample}"`); } } } - // v2.0.71 (#115) — fabricate detection on stream tail - // (only if NLU didn't recover anything). - if (markers.length === 0 && collectedToolCalls.length === 0) { - const lastUser = latestRealUserText(messages) || ''; - const fab = detectFabricatedToolResult(accNarrative, { lastUserText: lastUser }); - if (fab) { - log.warn(`Chat[stream]: fabricate detected — model=${modelKey} pattern=${fab.matchedPattern} sample="${fab.sample}"`); + } + emitContent(pathStreamText.flush()); + emitThinking(pathStreamThinking.flush()); + + // v2.0.65 native bridge: cascade trajectory steps come back on + // cascadeResult.toolCalls with cascade_native:true. Translate + // each into the caller's OpenAI tool name + reverse-mapped + // args, allowlist-filter, then emit as tool_call deltas. We do + // this at the tail (after pathStreamText flush) rather than + // mid-stream because cascadeChat doesn't expose per-step + // callbacks for native steps yet — clients see one batched + // tool_calls turn instead of fully-streamed deltas. That's a + // known gap; trades streaming-grain for shipping a working + // bridge first. + // v2.0.70 — onChunk now emits cascade native tool_calls + // mid-stream (see "Cascade native trajectory tool_call + // streamed live" branch above). The batch path here only + // catches the tail case where collectedToolCalls is still + // empty after stream end (e.g. final-sweep step came late); + // dedupe by id so we never emit a tool_call twice. + if (nativeBridgeOn && cascadeResult?.toolCalls?.length && collectedToolCalls.length === 0) { + const lookup = nativeOpts?.callerLookup || new Map(); + const nativeRaw = []; + for (const raw of cascadeResult.toolCalls) { + if (!raw?.cascade_native) continue; + if (markBridgeDiagCascadeRaw(raw)) { + recordNativeBridgeCascadeToolCall(raw.name); + } + if (isCompletedReadUrlNativeResult(raw)) { + if (!completedNativeReadUrlResults.some(existing => (existing.id || '') === (raw.id || ''))) { + completedNativeReadUrlResults.push(raw); + } + continue; } + const candidates = lookup.get(raw.name) || []; + const callerName = candidates[0]; + if (!callerName) { + bridgeDiag.unmappedToolCalls++; + bridgeDiag.unmappedKinds.push(raw.name); + recordNativeBridgeUnmappedCascadeToolCall(raw.name); + continue; + } + const reverseFn = TOOL_MAP[callerName]?.reverse; + let cascadeArgs; + try { cascadeArgs = JSON.parse(raw.argumentsJson || '{}'); } catch { bridgeDiag.argParseFailures++; cascadeArgs = {}; } + let openaiArgs; + try { openaiArgs = reverseFn ? reverseFn(cascadeArgs) : cascadeArgs; } + catch { bridgeDiag.reverseFailures++; openaiArgs = cascadeArgs; } + bridgeDiag.mappedToolCalls++; + bridgeDiag.mappedNames.push(callerName); + nativeRaw.push({ + id: raw.id || `call_${nativeRaw.length}_${Date.now().toString(36)}`, + name: callerName, + argumentsJson: JSON.stringify(openaiArgs ?? {}), + }); } - } - } - emitContent(pathStreamText.flush()); - emitThinking(pathStreamThinking.flush()); - - // v2.0.65 native bridge: cascade trajectory steps come back on - // cascadeResult.toolCalls with cascade_native:true. Translate - // each into the caller's OpenAI tool name + reverse-mapped - // args, allowlist-filter, then emit as tool_call deltas. We do - // this at the tail (after pathStreamText flush) rather than - // mid-stream because cascadeChat doesn't expose per-step - // callbacks for native steps yet — clients see one batched - // tool_calls turn instead of fully-streamed deltas. That's a - // known gap; trades streaming-grain for shipping a working - // bridge first. - // v2.0.70 — onChunk now emits cascade native tool_calls - // mid-stream (see "Cascade native trajectory tool_call - // streamed live" branch above). The batch path here only - // catches the tail case where collectedToolCalls is still - // empty after stream end (e.g. final-sweep step came late); - // dedupe by id so we never emit a tool_call twice. - if (nativeBridgeOn && cascadeResult?.toolCalls?.length && collectedToolCalls.length === 0) { - const lookup = nativeOpts?.callerLookup || new Map(); - const nativeRaw = []; - for (const raw of cascadeResult.toolCalls) { - if (!raw?.cascade_native) continue; - if (markBridgeDiagCascadeRaw(raw)) { - recordNativeBridgeCascadeToolCall(raw.name); + const filteredNative = filterToolCallsByAllowlist(nativeRaw, declaredTools); + for (const rawTc of filteredNative) { + const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages)); + const idx = collectedToolCalls.length; + collectedToolCalls.push(tc); + recordNativeBridgeEmittedToolCall(tc.name, { source: 'cascade' }); + emitToolCallDelta(tc, idx); } - if (isCompletedReadUrlNativeResult(raw)) { - if (!completedNativeReadUrlResults.some(existing => (existing.id || '') === (raw.id || ''))) { - completedNativeReadUrlResults.push(raw); - } - continue; + if (filteredNative.length === 0 && completedNativeReadUrlResults.length === 0 && cascadeResult.toolCalls.some(tc => tc.cascade_native)) { + log.info(`Chat[stream]: nativeBridge=true received cascade tool calls but none mapped to caller tools (kinds=${cascadeResult.toolCalls.filter(tc => tc.cascade_native).map(tc => tc.name).join(',')})`); } - const candidates = lookup.get(raw.name) || []; - const callerName = candidates[0]; - if (!callerName) { - bridgeDiag.unmappedToolCalls++; - bridgeDiag.unmappedKinds.push(raw.name); - recordNativeBridgeUnmappedCascadeToolCall(raw.name); - continue; + } + if (accText.length === 0 && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length) { + const fallbackText = completedNativeReadUrlResults.map(raw => raw.result).filter(Boolean).join('\n'); + if (fallbackText) emitContent(pathStreamText.feed(fallbackText)); + emitContent(pathStreamText.flush()); + } + if (nativeBridgeOn && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length === 0) recordNativeBridgeNoToolCallResponse(); + bridgeDiag.totalToolCalls = collectedToolCalls.length; + bridgeDiag.noToolCalls = bridgeDiag.requestedTools && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length === 0; + logBridgeResultDiagnostics(reqId, bridgeDiag); + // Pool check-in on success (cascade only) + if (reuseEnabled && cascadeResult?.cascadeId && (accText || collectedToolCalls.length)) { + const turnComplete = appendAssistantTurn(messages, accText, collectedToolCalls); + const fpAfter = fingerprintAfter(turnComplete, modelKey, callerKey, fpOpts); + const ttlHint = ttlHintFromCachePolicy(cachePolicy); + poolCheckin(fpAfter, { + cascadeId: cascadeResult.cascadeId, + sessionId: cascadeResult.sessionId, + lsPort: ls.port, + lsGeneration: cascadeResult.lsGeneration || ls.generation, + apiKey: currentApiKey, + modelKey: modelKey || '', + stepOffset: Number.isFinite(cascadeResult.stepOffset) ? cascadeResult.stepOffset : reuseEntry?.stepOffset, + generatorOffset: Number.isFinite(cascadeResult.generatorOffset) ? cascadeResult.generatorOffset : reuseEntry?.generatorOffset, + historyCoverage: cascadeResult.historyCoverage || reuseEntry?.historyCoverage || null, + createdAt: reuseEntry?.createdAt, + }, callerKey, ttlHint === undefined ? 0 : ttlHint); + + // Bind caller to this account for the next turn + if (callerKey && isStickyEnabled() && acct) { + setStickyBinding(callerKey, modelKey, acct.id, acct.apiKey); } - const reverseFn = TOOL_MAP[callerName]?.reverse; - let cascadeArgs; - try { cascadeArgs = JSON.parse(raw.argumentsJson || '{}'); } catch { bridgeDiag.argParseFailures++; cascadeArgs = {}; } - let openaiArgs; - try { openaiArgs = reverseFn ? reverseFn(cascadeArgs) : cascadeArgs; } - catch { bridgeDiag.reverseFailures++; openaiArgs = cascadeArgs; } - bridgeDiag.mappedToolCalls++; - bridgeDiag.mappedNames.push(callerName); - nativeRaw.push({ - id: raw.id || `call_${nativeRaw.length}_${Date.now().toString(36)}`, - name: callerName, - argumentsJson: JSON.stringify(openaiArgs ?? {}), + } + // success + if (hadSuccess) reportSuccess(currentApiKey); + updateCapability(currentApiKey, modelKey, true, 'success'); + recordRequest(model, true, Date.now() - startTime, currentApiKey); + if (!rolePrinted) { + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); } - const filteredNative = filterToolCallsByAllowlist(nativeRaw, declaredTools); - for (const rawTc of filteredNative) { - const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages)); - const idx = collectedToolCalls.length; - collectedToolCalls.push(tc); - recordNativeBridgeEmittedToolCall(tc.name, { source: 'cascade' }); - emitToolCallDelta(tc, idx); + // For response_format=json_* we buffered all content — flush one + // clean JSON payload now. extractJsonPayload strips fences and + // any preamble text, returning raw parseable JSON (or the + // trimmed original when nothing parses). + if (wantJson && accText) { + const cleaned = stabilizeJsonPayload(accText, messages); + if (cleaned) { + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: { content: cleaned }, finish_reason: null }] + }); + accText = cleaned; + } } - if (filteredNative.length === 0 && completedNativeReadUrlResults.length === 0 && cascadeResult.toolCalls.some(tc => tc.cascade_native)) { - log.info(`Chat[stream]: nativeBridge=true received cascade tool calls but none mapped to caller tools (kinds=${cascadeResult.toolCalls.filter(tc => tc.cascade_native).map(tc => tc.name).join(',')})`); + // GLM5.1 silence fallback (#86 follow-up KLFDan0534) — see + // shouldFallbackThinkingToText comment for rationale. + // Inside streamResponse the routing key arrives as the + // `modelKey` param (caller passes routingModelKey there); + // wantThinking comes through deps because body isn't in + // scope here (#93 follow-up zhangzhang-bit). + if (shouldFallbackThinkingToText({ + routingModelKey: modelKey, + wantThinking: deps.wantThinking, + accText, + accThinking, + hasToolCalls: collectedToolCalls.length > 0, + })) { + log.info(`Chat[${reqId}]: thinking-only stream from non-reasoning model ${modelKey}; promoting ${accThinking.length}c thinking → content`); + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: { content: accThinking }, finish_reason: null }] + }); + accText = accThinking; + accThinking = ''; } - } - if (accText.length === 0 && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length) { - const fallbackText = completedNativeReadUrlResults.map(raw => raw.result).filter(Boolean).join('\n'); - if (fallbackText) emitContent(pathStreamText.feed(fallbackText)); - emitContent(pathStreamText.flush()); - } - if (nativeBridgeOn && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length === 0) recordNativeBridgeNoToolCallResponse(); - bridgeDiag.totalToolCalls = collectedToolCalls.length; - bridgeDiag.noToolCalls = bridgeDiag.requestedTools && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length === 0; - logBridgeResultDiagnostics(reqId, bridgeDiag); - // Pool check-in on success (cascade only) - if (reuseEnabled && cascadeResult?.cascadeId && (accText || collectedToolCalls.length)) { - const turnComplete = appendAssistantTurn(messages, accText, collectedToolCalls); - const fpAfter = fingerprintAfter(turnComplete, modelKey, callerKey, fpOpts); - const ttlHint = ttlHintFromCachePolicy(cachePolicy); - poolCheckin(fpAfter, { - cascadeId: cascadeResult.cascadeId, - sessionId: cascadeResult.sessionId, - lsPort: ls.port, - lsGeneration: cascadeResult.lsGeneration || ls.generation, - apiKey: currentApiKey, - modelKey: modelKey || '', - stepOffset: Number.isFinite(cascadeResult.stepOffset) ? cascadeResult.stepOffset : reuseEntry?.stepOffset, - generatorOffset: Number.isFinite(cascadeResult.generatorOffset) ? cascadeResult.generatorOffset : reuseEntry?.generatorOffset, - historyCoverage: cascadeResult.historyCoverage || reuseEntry?.historyCoverage || null, - createdAt: reuseEntry?.createdAt, - }, callerKey, ttlHint === undefined ? 0 : ttlHint); - - // Bind caller to this account for the next turn - if (callerKey && isStickyEnabled() && acct) { - setStickyBinding(callerKey, modelKey, acct.id, acct.apiKey); + const finalReason = collectedToolCalls.length ? 'tool_calls' : 'stop'; + // OpenAI spec: the finish_reason chunk carries NO usage, then a + // separate terminal chunk has empty choices[] + usage + // (stream_options.include_usage convention). Emitting usage on + // both made some clients double-count billing. Drop the first. + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [{ index: 0, delta: {}, finish_reason: finalReason }] + }); + { + const usage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking, cachePolicy); + try { recordTokenUsage(usage); } catch { } + send({ + id, object: 'chat.completion.chunk', created, model, + choices: [], usage + }); } - } - // success - if (hadSuccess) reportSuccess(currentApiKey); - updateCapability(currentApiKey, modelKey, true, 'success'); - recordRequest(model, true, Date.now() - startTime, currentApiKey); - if (!rolePrinted) { - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); - } - // For response_format=json_* we buffered all content — flush one - // clean JSON payload now. extractJsonPayload strips fences and - // any preamble text, returning raw parseable JSON (or the - // trimmed original when nothing parses). - if (wantJson && accText) { - const cleaned = stabilizeJsonPayload(accText, messages); - if (cleaned) { - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: { content: cleaned }, finish_reason: null }] }); - accText = cleaned; + if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); } + if (ckey && !collectedToolCalls.length && (accText || accThinking)) { + cacheSet(ckey, { text: accText, thinking: accThinking }); } - } - // GLM5.1 silence fallback (#86 follow-up KLFDan0534) — see - // shouldFallbackThinkingToText comment for rationale. - // Inside streamResponse the routing key arrives as the - // `modelKey` param (caller passes routingModelKey there); - // wantThinking comes through deps because body isn't in - // scope here (#93 follow-up zhangzhang-bit). - if (shouldFallbackThinkingToText({ - routingModelKey: modelKey, - wantThinking: deps.wantThinking, - accText, - accThinking, - hasToolCalls: collectedToolCalls.length > 0, - })) { - log.info(`Chat[${reqId}]: thinking-only stream from non-reasoning model ${modelKey}; promoting ${accThinking.length}c thinking → content`); - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: { content: accThinking }, finish_reason: null }] }); - accText = accThinking; - accThinking = ''; - } - const finalReason = collectedToolCalls.length ? 'tool_calls' : 'stop'; - // OpenAI spec: the finish_reason chunk carries NO usage, then a - // separate terminal chunk has empty choices[] + usage - // (stream_options.include_usage convention). Emitting usage on - // both made some clients double-count billing. Drop the first. - send({ id, object: 'chat.completion.chunk', created, model, - choices: [{ index: 0, delta: {}, finish_reason: finalReason }] }); - { - const usage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking, cachePolicy); - try { recordTokenUsage(usage); } catch {} - send({ id, object: 'chat.completion.chunk', created, model, - choices: [], usage }); - } - if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); } - if (ckey && !collectedToolCalls.length && (accText || accThinking)) { - cacheSet(ckey, { text: accText, thinking: accThinking }); - } - return; - } catch (err) { - lastErr = err; - reuseEntry = null; // don't try to reuse on retry - // v2.0.25 HIGH-2: client.js marks the error when it tried to - // recover from a "cascade not found" but couldn't. The entry - // we held is dead — never restore it on the way out. - if (err.reuseEntryInvalid) reuseEntryDead = true; - // #101 (nalayahfowlkest-ship-it): when the upstream model - // provider times out mid-stream ("context deadline exceeded" - // / "Client.Timeout or context cancellation while reading - // body"), the cascade trajectory is left in an inconsistent - // state — the assistant never finished, but the prior - // tool_result is still in there. Restoring this cascade to - // the pool causes the NEXT request to reuse a half-broken - // trajectory, and the model only sees the trailing tool - // result with no earlier user prompts ("I can see the - // content from a previous tool call ... but I don't have - // the earlier conversation context"). - const isDeadline = isUpstreamDeadlineExceeded(err); - if (isDeadline) { - reuseEntryDead = true; - } - const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message); - const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message); - const isInternal = /internal error occurred.*error id/i.test(err.message); - const isTransport = isCascadeTransportError(err); - const isTransient = !isDeadline && isUpstreamTransientError(err, isInternal); - // v2.0.61 (#113) — same policy detection as nonStreamResponse. - const isPolicyBlocked = /cyber\s*verification|content[\s_-]+policy|policy[\s_-]+(?:violation|blocked|denied)|safety[\s_-]+(?:policy|blocked)|prompt[\s_-]+(?:rejected|blocked)\s+by[\s_-]+policy|usage[\s_-]+policy[\s_-]+violation/i.test(err.message); - if (isAuthFail) reportError(currentApiKey); - if (isRateLimit) { recordRateLimited(); markRateLimited(currentApiKey, rateLimitCooldownMs(err.message), modelKey); err.isRateLimit = true; err.isModelError = true; err.kind ||= 'model_error'; } - // v2.0.91 — IP-level rate limit circuit breaker (stream path). - // Same logic as non-stream: ≥3 accounts rate-limited for the - // same model within 8s → Windsurf is doing IP-wide cooldown, - // stop burning accounts and surface immediately. - const ctx = deps.context || {}; - if (isRateLimit && !ctx.__rlAborted) { - if (!ctx.__rateLimitEvents) ctx.__rateLimitEvents = []; - const RL_WINDOW_MS = 8_000; - const RL_BURST_THRESHOLD = 3; - const now = Date.now(); - ctx.__rateLimitEvents.push({ - time: now, - model: modelKey, - account: acct?.id, - cooldownMs: rateLimitBurstCooldownMs({ - message: err.message || '', - retryAfterMs: err.retry_after_ms, - apiKey: currentApiKey, - modelKey, - }), - }); - const cutoff = now - RL_WINDOW_MS; - while (ctx.__rateLimitEvents.length && ctx.__rateLimitEvents[0].time < cutoff) { - ctx.__rateLimitEvents.shift(); + return; + } catch (err) { + lastErr = err; + reuseEntry = null; // don't try to reuse on retry + // v2.0.25 HIGH-2: client.js marks the error when it tried to + // recover from a "cascade not found" but couldn't. The entry + // we held is dead — never restore it on the way out. + if (err.reuseEntryInvalid) reuseEntryDead = true; + // #101 (nalayahfowlkest-ship-it): when the upstream model + // provider times out mid-stream ("context deadline exceeded" + // / "Client.Timeout or context cancellation while reading + // body"), the cascade trajectory is left in an inconsistent + // state — the assistant never finished, but the prior + // tool_result is still in there. Restoring this cascade to + // the pool causes the NEXT request to reuse a half-broken + // trajectory, and the model only sees the trailing tool + // result with no earlier user prompts ("I can see the + // content from a previous tool call ... but I don't have + // the earlier conversation context"). + const isDeadline = isUpstreamDeadlineExceeded(err); + if (isDeadline) { + reuseEntryDead = true; } - const sameModelBurst = ctx.__rateLimitEvents.filter(e => e.model === modelKey); - if (sameModelBurst.length >= RL_BURST_THRESHOLD) { - ctx.__rlAborted = true; - log.warn(`Chat[${reqId}] stream: IP-rate-limit burst — ${sameModelBurst.length} accounts rate-limited on ${model} within ${RL_WINDOW_MS}ms. Short-circuiting.`); - const cooldown = Math.max(...sameModelBurst.map(e => e.cooldownMs || IP_RATE_LIMIT_BURST_FLOOR_MS)); - lastErr = Object.assign(new Error(`All accounts temporarily rate-limited on ${model}. Windsurf upstream is applying IP-level cooldown. Wait ~${formatRetryAfter(cooldown)} before retrying.`), { type: 'rate_limit_exceeded', retry_after_ms: cooldown }); + const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message); + const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message); + const isInternal = /internal error occurred.*error id/i.test(err.message); + const isTransport = isCascadeTransportError(err); + const isTransient = !isDeadline && isUpstreamTransientError(err, isInternal); + // v2.0.61 (#113) — same policy detection as nonStreamResponse. + const isPolicyBlocked = /cyber\s*verification|content[\s_-]+policy|policy[\s_-]+(?:violation|blocked|denied)|safety[\s_-]+(?:policy|blocked)|prompt[\s_-]+(?:rejected|blocked)\s+by[\s_-]+policy|usage[\s_-]+policy[\s_-]+violation/i.test(err.message); + if (isAuthFail) reportError(currentApiKey); + if (isRateLimit) { recordRateLimited(); markRateLimited(currentApiKey, rateLimitCooldownMs(err.message), modelKey); err.isRateLimit = true; err.isModelError = true; err.kind ||= 'model_error'; } + // v2.0.91 — IP-level rate limit circuit breaker (stream path). + // Same logic as non-stream: ≥3 accounts rate-limited for the + // same model within 8s → Windsurf is doing IP-wide cooldown, + // stop burning accounts and surface immediately. + const ctx = deps.context || {}; + if (isRateLimit && !ctx.__rlAborted) { + if (!ctx.__rateLimitEvents) ctx.__rateLimitEvents = []; + const RL_WINDOW_MS = 8_000; + const RL_BURST_THRESHOLD = 3; + const now = Date.now(); + ctx.__rateLimitEvents.push({ + time: now, + model: modelKey, + account: acct?.id, + cooldownMs: rateLimitBurstCooldownMs({ + message: err.message || '', + retryAfterMs: err.retry_after_ms, + apiKey: currentApiKey, + modelKey, + }), + }); + const cutoff = now - RL_WINDOW_MS; + while (ctx.__rateLimitEvents.length && ctx.__rateLimitEvents[0].time < cutoff) { + ctx.__rateLimitEvents.shift(); + } + const sameModelBurst = ctx.__rateLimitEvents.filter(e => e.model === modelKey); + if (sameModelBurst.length >= RL_BURST_THRESHOLD) { + ctx.__rlAborted = true; + log.warn(`Chat[${reqId}] stream: IP-rate-limit burst — ${sameModelBurst.length} accounts rate-limited on ${model} within ${RL_WINDOW_MS}ms. Short-circuiting.`); + const cooldown = Math.max(...sameModelBurst.map(e => e.cooldownMs || IP_RATE_LIMIT_BURST_FLOOR_MS)); + lastErr = Object.assign(new Error(`All accounts temporarily rate-limited on ${model}. Windsurf upstream is applying IP-level cooldown. Wait ~${formatRetryAfter(cooldown)} before retrying.`), { type: 'rate_limit_exceeded', retry_after_ms: cooldown }); + break; + } + } + if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; err.kind ||= 'transient_stall'; } + if (isPolicyBlocked) { recordPolicyBlocked(); err.isPolicyBlocked = true; err.isModelError = true; err.kind = 'policy_blocked'; } + if (isTransport) { err.isModelError = true; err.kind ||= 'transient_stall'; } + // v2.0.56 stream-path ban detection — same 2-strike logic as + // non-stream. See nonStreamResponse for rationale. + if (!isRateLimit && looksLikeBanSignal(err.message)) { + reportBanSignal(currentApiKey, err.message); + err.isModelError = true; err.kind ||= 'auth_error'; + } + if (err.isModelError && err.kind !== 'transient_stall' && !isRateLimit && !isInternal) { + updateCapability(currentApiKey, modelKey, false, 'model_error'); + } + if (isRateLimit && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === currentApiKey) { + log.info(`Chat[${reqId}]: strict reuse preserved cascade after rate limit`); break; } - } - if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; err.kind ||= 'transient_stall'; } - if (isPolicyBlocked) { recordPolicyBlocked(); err.isPolicyBlocked = true; err.isModelError = true; err.kind = 'policy_blocked'; } - if (isTransport) { err.isModelError = true; err.kind ||= 'transient_stall'; } - // v2.0.56 stream-path ban detection — same 2-strike logic as - // non-stream. See nonStreamResponse for rationale. - if (!isRateLimit && looksLikeBanSignal(err.message)) { - reportBanSignal(currentApiKey, err.message); - err.isModelError = true; err.kind ||= 'auth_error'; - } - if (err.isModelError && err.kind !== 'transient_stall' && !isRateLimit && !isInternal) { - updateCapability(currentApiKey, modelKey, false, 'model_error'); - } - if (isRateLimit && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === currentApiKey) { - log.info(`Chat[${reqId}]: strict reuse preserved cascade after rate limit`); - break; - } - // v2.0.61 (#113): policy refusal isn't account-bound, drop - // out of the retry loop immediately and let the SSE error - // path emit a 451-style chunk to the client. - if (isPolicyBlocked) { - log.warn(`Chat[${reqId}] stream: policy_blocked on ${safeKeyRef(currentApiKey, 'apiKey')}, not retrying`); - break; - } - if (isDeadline) { - err.type = 'upstream_deadline_exceeded'; - err.code = 'windsurf_provider_deadline'; - break; - } - // Retry only if nothing has been streamed yet AND it's a retryable error - if (!hadSuccess && (err.isModelError || isRateLimit)) { - if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) { - const tag = isRateLimit ? 'rate_limit' : isTransient ? 'upstream_transient' : 'model_error'; - log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) failed (${tag}) on ${model}, stickyNoFallback enabled — not trying other accounts`); + // v2.0.61 (#113): policy refusal isn't account-bound, drop + // out of the retry loop immediately and let the SSE error + // path emit a 451-style chunk to the client. + if (isPolicyBlocked) { + log.warn(`Chat[${reqId}] stream: policy_blocked on ${safeKeyRef(currentApiKey, 'apiKey')}, not retrying`); break; } - const tag = isRateLimit ? 'rate_limit' : isTransient ? 'upstream_transient' : 'model_error'; - if (isTransient) { - streamInternalCount++; - const backoffMs = await internalErrorBackoff(streamInternalCount - 1); - log.warn(`Chat[${reqId}] stream: ${safeAccountRef(acct)} upstream transient error (${isTransport ? 'cascade_transport' : 'internal_error'}), waited ${backoffMs}ms before next account`); - } else { - log.warn(`Account ${safeAccountRef(acct)} failed (${tag}) on ${model}, trying next`); + if (isDeadline) { + err.type = 'upstream_deadline_exceeded'; + err.code = 'windsurf_provider_deadline'; + break; } - continue; + // Retry only if nothing has been streamed yet AND it's a retryable error + if (!hadSuccess && (err.isModelError || isRateLimit)) { + if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) { + const tag = isRateLimit ? 'rate_limit' : isTransient ? 'upstream_transient' : 'model_error'; + log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) failed (${tag}) on ${model}, stickyNoFallback enabled — not trying other accounts`); + break; + } + const tag = isRateLimit ? 'rate_limit' : isTransient ? 'upstream_transient' : 'model_error'; + if (isTransient) { + streamInternalCount++; + const backoffMs = await internalErrorBackoff(streamInternalCount - 1); + log.warn(`Chat[${reqId}] stream: ${safeAccountRef(acct)} upstream transient error (${isTransport ? 'cascade_transport' : 'internal_error'}), waited ${backoffMs}ms before next account`); + } else { + log.warn(`Account ${safeAccountRef(acct)} failed (${tag}) on ${model}, trying next`); + } + continue; + } + break; } - break; - } } finally { // Pair every successful getApiKey/acquireAccountByKey with a // release so the in-flight balancer in auth.js (issue #37) @@ -3995,14 +4050,14 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad const errMsg = allInternal ? upstreamTransientErrorMessage(model, tried.length, lastIsTransport ? 'cascade_transport' : 'internal_error') : deadlineExceeded - ? upstreamDeadlineExceededMessage(model) - : poolExhausted - ? sanitizeText(lastErr?.message || 'language server pool exhausted') - : temporaryUnavailable.allUnavailable - ? `${model} 所有账号暂时不可用,请 ${Math.ceil(temporaryUnavailable.retryAfterMs / 1000)} 秒后重试` - : rl.allLimited - ? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试` - : sanitizeText(lastErr?.message || 'no accounts'); + ? upstreamDeadlineExceededMessage(model) + : poolExhausted + ? sanitizeText(lastErr?.message || 'language server pool exhausted') + : temporaryUnavailable.allUnavailable + ? `${model} 所有账号暂时不可用,请 ${Math.ceil(temporaryUnavailable.retryAfterMs / 1000)} 秒后重试` + : rl.allLimited + ? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试` + : sanitizeText(lastErr?.message || 'no accounts'); if (allInternal) { log.error(`Chat[${reqId}] stream: ${tried.length}/${tried.length} accounts hit upstream transient error — surfacing upstream_transient_error`); } @@ -4027,15 +4082,15 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad ? 'upstream_transient_error' : deadlineExceeded ? 'upstream_deadline_exceeded' - : poolExhausted - ? 'ls_pool_exhausted' - : (temporaryUnavailable.allUnavailable || lastErr?.type === 'rate_limit_exceeded') - ? 'rate_limit_exceeded' - : 'upstream_error'; + : poolExhausted + ? 'ls_pool_exhausted' + : (temporaryUnavailable.allUnavailable || lastErr?.type === 'rate_limit_exceeded') + ? 'rate_limit_exceeded' + : 'upstream_error'; send(chatStreamError(errMsg, errType, deadlineExceeded ? 'windsurf_provider_deadline' : null)); } if (!emittedClientPayload) res.write('data: [DONE]\n\n'); - } catch {} + } catch { } if (!res.writableEnded) res.end(); } finally { unregisterSse();