diff --git a/src/dashboard/api.js b/src/dashboard/api.js
index da496b7..684fbb5 100644
--- a/src/dashboard/api.js
+++ b/src/dashboard/api.js
@@ -30,7 +30,7 @@ import { getLogs, subscribeToLogs, unsubscribeFromLogs } from './logger.js';
import { getProxyConfig, getProxyConfigMasked, setGlobalProxy, setAccountProxy, removeProxy, getEffectiveProxy } from './proxy-config.js';
import { MODELS, MODEL_TIER_ACCESS as _TIER_TABLE, getTierModels as _getTierModels } from '../models.js';
import { windsurfLogin, refreshFirebaseToken, reRegisterWithCodeium } from './windsurf-login.js';
-import { getModelAccessConfig, setModelAccessMode, setModelAccessList, addModelToList, removeModelFromList } from './model-access.js';
+import { getModelAccessConfig, setModelAccessMode, setModelAccessList, addModelToList, removeModelFromList, setDefaultModel } from './model-access.js';
import { checkMessageRateLimit } from '../windsurf-api.js';
import { getNativeBridgeConfigStatus } from '../cascade-native-bridge.js';
import { getNativeBridgeStats } from '../native-bridge-stats.js';
@@ -1251,6 +1251,7 @@ export async function handleDashboardApi(method, subpath, body, req, res) {
if (subpath === '/model-access' && method === 'PUT') {
if (body.mode) setModelAccessMode(body.mode);
if (body.list) setModelAccessList(body.list);
+ if (body.defaultModel !== undefined) setDefaultModel(body.defaultModel);
return json(res, 200, { success: true, config: getModelAccessConfig() });
}
@@ -1556,7 +1557,7 @@ async function gitStatus() {
try {
await runGit(['fetch', '--quiet', 'origin']);
remote = (await runGit(['rev-parse', `origin/${branch}`])).trim();
- } catch {}
+ } catch { }
const localMsg = (await runGit(['log', '-1', '--pretty=format:%s'])).trim();
const behind = remote && remote !== commit;
const remoteMsg = behind ? (await runGit(['log', '-1', '--pretty=format:%s', remote]).catch(() => '')).trim() : '';
@@ -1611,21 +1612,21 @@ async function testProxy({ host, port, username, password, type }) {
// TLS handshake + GET to verify the tunnel works
return new Promise((resolve, reject) => {
- const tlsSock = tls.connect({ socket, servername: targetHost, rejectUnauthorized: false }, () => {
- tlsSock.write(`GET / HTTP/1.1\r\nHost: ${targetHost}\r\nConnection: close\r\nUser-Agent: WindsurfAPI/ProxyTest\r\n\r\n`);
- });
- const chunks = [];
- tlsSock.on('data', c => chunks.push(c));
- tlsSock.on('end', () => {
- const body = Buffer.concat(chunks).toString('utf-8');
- const match = body.match(/\r\n\r\n([^\r\n]+)/);
- const ip = match ? match[1].trim() : '';
- tlsSock.destroy();
- if (!ip || !/^\d+\.\d+\.\d+\.\d+$/.test(ip)) {
- return reject(new Error('ERR_TLS_TUNNEL_ERROR'));
- }
- resolve({ egressIp: ip, type });
- });
- tlsSock.on('error', (err) => reject(new Error(`ERR_TLS_FAILED:${err.message}`)));
+ const tlsSock = tls.connect({ socket, servername: targetHost, rejectUnauthorized: false }, () => {
+ tlsSock.write(`GET / HTTP/1.1\r\nHost: ${targetHost}\r\nConnection: close\r\nUser-Agent: WindsurfAPI/ProxyTest\r\n\r\n`);
+ });
+ const chunks = [];
+ tlsSock.on('data', c => chunks.push(c));
+ tlsSock.on('end', () => {
+ const body = Buffer.concat(chunks).toString('utf-8');
+ const match = body.match(/\r\n\r\n([^\r\n]+)/);
+ const ip = match ? match[1].trim() : '';
+ tlsSock.destroy();
+ if (!ip || !/^\d+\.\d+\.\d+\.\d+$/.test(ip)) {
+ return reject(new Error('ERR_TLS_TUNNEL_ERROR'));
+ }
+ resolve({ egressIp: ip, type });
+ });
+ tlsSock.on('error', (err) => reject(new Error(`ERR_TLS_FAILED:${err.message}`)));
});
}
diff --git a/src/dashboard/index.html b/src/dashboard/index.html
index efe6f79..654ddbd 100644
--- a/src/dashboard/index.html
+++ b/src/dashboard/index.html
@@ -2013,6 +2013,11 @@
模型控制
+
+
+
+
当请求的模型不在白名单中时,将使用此模型代替。留空则拒绝白名单外的请求。
+
@@ -4598,6 +4603,7 @@ 控制台登录
this.allModels = modelData.models || [];
this.modelAccessConfig = accessData.mode ? accessData : { mode: 'all', list: [] };
document.querySelector(`input[name="model-mode"][value="${this.modelAccessConfig.mode}"]`).checked = true;
+ document.getElementById('default-model-input').value = this.modelAccessConfig.defaultModel || '';
this.updateModelListUI();
},
@@ -4653,12 +4659,20 @@ 控制台登录
},
async setModelMode(mode) {
- await this.api('PUT', '/model-access', { mode });
+ const defaultModel = document.getElementById('default-model-input').value.trim();
+ await this.api('PUT', '/model-access', { mode, defaultModel });
this.modelAccessConfig.mode = mode;
+ this.modelAccessConfig.defaultModel = defaultModel;
this.updateModelListUI();
this.toast(I18n.t('toast.modeUpdated'));
},
+ async handleDefaultModelInput() {
+ const defaultModel = document.getElementById('default-model-input').value.trim();
+ await this.api('PUT', '/model-access', { defaultModel });
+ this.modelAccessConfig.defaultModel = defaultModel;
+ },
+
async toggleModelInList(modelId) {
const idx = this.modelAccessConfig.list.indexOf(modelId);
if (idx > -1) {
diff --git a/src/dashboard/model-access.js b/src/dashboard/model-access.js
index 0aac738..a47a3c1 100644
--- a/src/dashboard/model-access.js
+++ b/src/dashboard/model-access.js
@@ -14,6 +14,7 @@ const ACCESS_FILE = join(config.dataDir, 'model-access.json');
const _config = {
mode: 'all',
list: [], // model IDs in the list
+ defaultModel: '', // default model to use if the requested one is blocked or not specified; optional, for UI purposes
};
// Load
@@ -60,6 +61,15 @@ export function removeModelFromList(modelId) {
save();
}
+export function setDefaultModel(modelId) {
+ _config.defaultModel = modelId || '';
+ save();
+}
+
+export function getDefaultModel() {
+ return _config.defaultModel || '';
+}
+
/**
* Some models in the catalog are simply the reasoning-mode variant of a
* base model (claude-opus-4.6 vs claude-opus-4.6-thinking). For
diff --git a/src/handlers/chat.js b/src/handlers/chat.js
index 64e431c..7441f23 100644
--- a/src/handlers/chat.js
+++ b/src/handlers/chat.js
@@ -7,7 +7,7 @@ import { createHash, randomUUID } from 'crypto';
import { WindsurfClient, contentToString, isCascadeTransportError } from '../client.js';
import { getApiKey, acquireAccountByKey, releaseAccount, getAccountAvailability, reportError, reportSuccess, markRateLimited, reportInternalError, updateCapability, getAccountList, isAllRateLimited, isAllTemporarilyUnavailable, refundReservation, looksLikeBanSignal, reportBanSignal, clearBanSignals, isModelBlockedByDrought, getDroughtSummary } from '../auth.js';
import { isStickyEnabled, setStickyBinding } from '../account/sticky-session.js';
-import { resolveModel, getModelInfo, pickRateLimitFallback } from '../models.js';
+import { resolveModel, getModelInfo, pickRateLimitFallback, MODELS } from '../models.js';
import { getLsFor, ensureLs } from '../langserver.js';
import { config, log } from '../config.js';
import { safeAccountRef, safeKeyRef } from '../log-safety.js';
@@ -1650,7 +1650,7 @@ async function _handleChatCompletionsInner(body, context = {}) {
const c = typeof m?.content === 'string' ? m.content : Array.isArray(m?.content) ? m.content.map(p => p?.type === 'text' ? p.text : `[${p?.type}]`).join('|') : '';
log.info(`Probe[${reqId}] msg[${mi}] role=${m?.role} ${requestLogSummary(c)}`);
}
- } catch {}
+ } catch { }
// Reject pathologically empty user turns. Without this, an empty
// `user.content` slips through and the model answers against the
@@ -1697,7 +1697,7 @@ async function _handleChatCompletionsInner(body, context = {}) {
return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0);
}, 0);
if (sysBytes >= 8000) {
- log.warn(`Probe[${reqId}]: large system prompt ${Math.round(sysBytes/1024)}KB — heavy clients (OpenClaw / Cline / opencode) may hit upstream panel-state retries above ~30KB`);
+ log.warn(`Probe[${reqId}]: large system prompt ${Math.round(sysBytes / 1024)}KB — heavy clients (OpenClaw / Cline / opencode) may hit upstream panel-state retries above ~30KB`);
}
}
@@ -1715,25 +1715,35 @@ async function _handleChatCompletionsInner(body, context = {}) {
} else if (wantThinking && isOpus47ModelKey(modelKey) && getModelInfo(modelKey + '-thinking') && !isOpus47ThinkingAutoRouteEnabled()) {
log.warn(`Chat[${reqId}]: Opus 4.7 thinking auto-route disabled; using base model ${modelKey}. Upstream LS rejects ${modelKey}-thinking as model not found. Set WINDSURFAPI_OPUS47_THINKING_UIDS=1 only after upstream registers it.`);
}
- const routingModelKey = effectiveModelKey;
- const modelInfo = getModelInfo(effectiveModelKey) || getModelInfo(modelKey);
+ let routingModelKey = effectiveModelKey;
+ let modelInfo = getModelInfo(effectiveModelKey) || getModelInfo(modelKey);
// Reject unknown models. Without this, chat.js used to fall through to
// legacy rawGetChatMessage with modelEnum=0 and modelUid=null, which
// upstream silently routed to a default model. Callers saw "I'm Claude 4.5"
// when they asked for `claude-4.6` (issue #68), or got blank responses for
// typos. Fail fast with the same shape OpenAI uses.
+ // However, if a default model is configured in model access control,
+ // use that instead of rejecting.
if (!modelInfo) {
- return {
- status: 400,
- body: {
- error: {
- message: `Unsupported model: ${reqModel || config.defaultModel}`,
- type: 'invalid_request_error',
- param: 'model',
- code: 'model_not_found',
+ const { defaultModel } = isModelAllowed(routingModelKey);
+ if (defaultModel) {
+ log.info(`Chat[${reqId}]: model ${routingModelKey} not found in catalog, using default model ${defaultModel}`);
+ routingModelKey = defaultModel;
+ modelInfo = MODELS[routingModelKey];
+ }
+ if (!modelInfo) {
+ return {
+ status: 400,
+ body: {
+ error: {
+ message: `Unsupported model: ${reqModel || config.defaultModel}`,
+ type: 'invalid_request_error',
+ param: 'model',
+ code: 'model_not_found',
+ },
},
- },
- };
+ };
+ }
}
// Return the user's original model name in response.model / response headers
// so external test harnesses (e.g. hvoy.ai "model signature" check) see
@@ -1750,6 +1760,15 @@ async function _handleChatCompletionsInner(body, context = {}) {
return { status: 403, body: { error: { message: access.reason, type: 'model_blocked' } } };
}
+ // If default model is set and the requested model is not in allowlist,
+ // use the default model instead
+ if (access.defaultModel) {
+ log.info(`Chat[${reqId}]: model ${routingModelKey} not in allowlist, using default model ${access.defaultModel}`);
+ routingModelKey = access.defaultModel;
+ modelInfo = MODELS[routingModelKey];
+ displayModel = routingModelKey;
+ }
+
if (isSpecialAgentModelInfo(modelInfo)) {
log.info(`Chat[${reqId}]: routing ${routingModelKey} through special-agent backend`);
return handleSpecialAgentChatCompletion(body, {
@@ -1819,8 +1838,8 @@ async function _handleChatCompletionsInner(body, context = {}) {
: [];
const nativeAllowlist = nativeBridgeOn
? Array.from(new Set(toolPartition.mapped
- .map(t => nativeAllowlistNameForTool(t?.function?.name))
- .filter(Boolean)))
+ .map(t => nativeAllowlistNameForTool(t?.function?.name))
+ .filter(Boolean)))
: [];
// Tools we ship to the emulation toolPreamble: the unmapped subset when
// bridge is on, or the full tools[] when bridge is off (legacy behaviour).
@@ -1941,7 +1960,7 @@ async function _handleChatCompletionsInner(body, context = {}) {
for (const m of (messages || [])) {
const c = typeof m?.content === 'string' ? m.content
: Array.isArray(m?.content) ? m.content.filter(p => p?.type === 'text').map(p => p.text || '').join('\n')
- : '';
+ : '';
const hit = c.match(/[^.\n]{0,40}(?:working directory|cwd||)[^.\n]{0,80}/i);
if (hit) { probe = hit[0].replace(/\s+/g, ' ').slice(0, 160); break; }
}
@@ -2122,19 +2141,19 @@ async function _handleChatCompletionsInner(body, context = {}) {
wantJson,
callerKey,
{
- checkMessageRateLimit: checkMessageRateLimitFn,
- waitForAccount: waitForAccountFn,
- cachePolicy,
- wantThinking,
- fpOpts: buildReuseOpts({ tools: effectiveTools, toolChoice: tool_choice, toolPreamble, preambleTier, emulateTools, route: body.__route || 'chat' }),
- tools: effectiveTools,
- route: body.__route || 'chat',
- nativeOpts,
- context,
- ensureLs: context.ensureLs,
- getLsFor: context.getLsFor,
- WindsurfClient: context.WindsurfClient,
- });
+ checkMessageRateLimit: checkMessageRateLimitFn,
+ waitForAccount: waitForAccountFn,
+ cachePolicy,
+ wantThinking,
+ fpOpts: buildReuseOpts({ tools: effectiveTools, toolChoice: tool_choice, toolPreamble, preambleTier, emulateTools, route: body.__route || 'chat' }),
+ tools: effectiveTools,
+ route: body.__route || 'chat',
+ nativeOpts,
+ context,
+ ensureLs: context.ensureLs,
+ getLsFor: context.getLsFor,
+ WindsurfClient: context.WindsurfClient,
+ });
}
// ── Local response cache (exact body match) ─────────────
@@ -2256,8 +2275,8 @@ async function _handleChatCompletionsInner(body, context = {}) {
const reason = tempUnavail.allUnavailable
? `所有可用账号暂时不可用,请 ${Math.ceil(tempUnavail.retryAfterMs / 1000)} 秒后重试`
: rateLimited.allLimited
- ? `所有可用账号均已达速率限制,请 ${Math.ceil(rateLimited.retryAfterMs / 1000)} 秒后重试`
- : `${Math.ceil(QUEUE_MAX_WAIT_MS / 1000)} 秒内没有账号变为可用 — 账号可能被速率限制或对当前模型无权限`;
+ ? `所有可用账号均已达速率限制,请 ${Math.ceil(rateLimited.retryAfterMs / 1000)} 秒后重试`
+ : `${Math.ceil(QUEUE_MAX_WAIT_MS / 1000)} 秒内没有账号变为可用 — 账号可能被速率限制或对当前模型无权限`;
lastErr = {
status: (tempUnavail.allUnavailable || rateLimited.allLimited) ? 429 : 503,
body: { error: { message: `${displayModel} 账号队列超时: ${reason}`, type: (tempUnavail.allUnavailable || rateLimited.allLimited) ? 'rate_limit_exceeded' : 'pool_exhausted' } },
@@ -2284,179 +2303,179 @@ async function _handleChatCompletionsInner(body, context = {}) {
log.info(`Chat[${reqId}]: native bridge account gate skipped ${safeAccountRef(acct)}`);
continue;
}
- // Pre-flight rate limit check (experimental): ask server.codeium.com if
- // this account still has message capacity before burning an LS round trip.
- if (isExperimentalEnabled('preflightRateLimit')) {
- try {
- const px = getEffectiveProxy(acct.id) || null;
- const rl = await checkMessageRateLimitFn(acct.apiKey, px);
- if (!rl.hasCapacity) {
- log.warn(`Preflight: ${safeAccountRef(acct)} has no capacity (remaining=${rl.messagesRemaining}), skipping`);
- refundReservation(acct.apiKey, acct.reservationTimestamp);
- if (Number.isFinite(rl.retryAfterMs) && rl.retryAfterMs > 0) {
- markRateLimited(acct.apiKey, rl.retryAfterMs, routingModelKey);
- }
- if (!reuseEntryDead && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) {
- const availability = getAccountAvailability(acct.apiKey, routingModelKey);
- const retryAfterMs = strictReuseRetryMs(availability);
- poolCheckin(fpBefore, checkedOutReuseEntry, callerKey, ttlHintFromCachePolicy(cachePolicy));
- log.info(`Chat[${reqId}]: strict reuse preserved cascade after preflight rate limit`);
- return {
- status: 429,
- headers: { 'Retry-After': String(Math.ceil(retryAfterMs / 1000)) },
- body: {
- error: {
- message: strictReuseMessage(displayModel, retryAfterMs, availability.reason),
- type: 'rate_limit_exceeded',
- retry_after_ms: retryAfterMs,
+ // Pre-flight rate limit check (experimental): ask server.codeium.com if
+ // this account still has message capacity before burning an LS round trip.
+ if (isExperimentalEnabled('preflightRateLimit')) {
+ try {
+ const px = getEffectiveProxy(acct.id) || null;
+ const rl = await checkMessageRateLimitFn(acct.apiKey, px);
+ if (!rl.hasCapacity) {
+ log.warn(`Preflight: ${safeAccountRef(acct)} has no capacity (remaining=${rl.messagesRemaining}), skipping`);
+ refundReservation(acct.apiKey, acct.reservationTimestamp);
+ if (Number.isFinite(rl.retryAfterMs) && rl.retryAfterMs > 0) {
+ markRateLimited(acct.apiKey, rl.retryAfterMs, routingModelKey);
+ }
+ if (!reuseEntryDead && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) {
+ const availability = getAccountAvailability(acct.apiKey, routingModelKey);
+ const retryAfterMs = strictReuseRetryMs(availability);
+ poolCheckin(fpBefore, checkedOutReuseEntry, callerKey, ttlHintFromCachePolicy(cachePolicy));
+ log.info(`Chat[${reqId}]: strict reuse preserved cascade after preflight rate limit`);
+ return {
+ status: 429,
+ headers: { 'Retry-After': String(Math.ceil(retryAfterMs / 1000)) },
+ body: {
+ error: {
+ message: strictReuseMessage(displayModel, retryAfterMs, availability.reason),
+ type: 'rate_limit_exceeded',
+ retry_after_ms: retryAfterMs,
+ },
},
- },
- };
+ };
+ }
+ continue;
}
- continue;
+ } catch (e) {
+ log.debug(`Preflight check failed for ${safeAccountRef(acct)}: ${e.message}`);
+ // Fail open — proceed with the request
}
- } catch (e) {
- log.debug(`Preflight check failed for ${safeAccountRef(acct)}: ${e.message}`);
- // Fail open — proceed with the request
}
- }
- try { await ensureLsFn(acct.proxy); } catch (e) {
- lastErr = isLsPoolExhausted(e) ? lsPoolExhaustedResponse(e) : { status: e.status || 503, body: { error: { message: e.message || String(e), type: e.type || 'ls_unavailable' } } };
- break;
- }
- const ls = getLsForFn(acct.proxy);
- if (!ls) { lastErr = { status: 503, body: { error: { message: 'No LS instance available', type: 'ls_unavailable' } } }; break; }
- // Cascade pins cascade_id to a specific LS port too; if the LS it was
- // born on has been replaced, the cascade_id is dead.
- if (reuseEntry && reuseEntry.lsPort !== ls.port) {
- log.info(`Chat[${reqId}]: reuse MISS — LS port changed`);
- checkedOutReuseEntry = null;
- reuseEntry = null;
- }
- const _msgChars = (messages || []).reduce((n, m) => {
- const c = m?.content;
- return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0);
- }, 0);
- log.info(`Chat[${reqId}]: model=${displayModel} flow=${useCascade ? 'cascade' : 'legacy'} attempt=${attempt + 1} ${safeAccountRef(acct)} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgChars}${reuseEntry ? ' reuse=1' : ''}${emulateTools ? ' tools=emu' : ''}`);
- const client = new WindsurfClientClass(acct.apiKey, ls.port, ls.csrfToken);
- const result = await nonStreamResponse(
- client, chatId, created, displayModel, routingModelKey, messages, cascadeMessages, modelEnum, modelUid,
- useCascade, acct.apiKey, ckey,
- // v2.0.87 (#129) — aliasModelKey is set by the outer wrapper
- // when this handler is the second pass of an auto-fallback
- // retry; it carries the ORIGINAL model name the client asked
- // for so the cascade pool entry gets indexed under both keys.
- reuseEnabled ? { reuseEntry, lsPort: ls.port, apiKey: acct.apiKey, accountId: acct.id, callerKey, cachePolicy, fpOpts, aliasModelKey: context.__aliasModelKey || null } : null,
- modelInfo?.provider || null,
- emulateTools, toolPreamble, wantJson, cachePolicy, wantThinking, tools, body.__route || 'chat',
- nativeOpts,
- // v2.0.88 (audit H-3) — when this is the inner-pass of an auto-
- // fallback retry, the outer wrapper threads the ORIGINAL request's
- // ckey through context.__originalCkey. We pass it down so the
- // success-path cacheSet writes under the original key as well —
- // next identical original-model request hits cache instead of
- // re-burning the rate-limit + fallback cycle.
- reqId,
- context.__originalCkey || null,
- );
- if (result.status === 200) return result;
- reuseEntry = null; // don't try to reuse on the retry
- if (result.reuseEntryInvalid) reuseEntryDead = true;
- // #101: same upstream-timeout invalidation as the stream path —
- // see the matching catch block in streamResponse for the full
- // rationale (cascade trajectory left half-broken, next reuse hits
- // it and the model "loses" the prior conversation).
- const _resultError = result.body?.error || {};
- const _resultMsg = String(_resultError.upstream_message || _resultError.message || '');
- if (
- result.status === 504
- || _resultError.type === 'upstream_deadline_exceeded'
- || _resultError.code === 'windsurf_provider_deadline'
- || isUpstreamDeadlineExceeded(_resultMsg)
- ) {
- reuseEntryDead = true;
- }
- lastErr = result;
- const errType = result.body?.error?.type;
- // v2.0.61 (#113): policy_blocked → don't rotate accounts, return
- // immediately. The model refused the request, swapping accounts
- // gives the same refusal but burns more quota.
- if (errType === 'policy_blocked') { recordPolicyBlocked(); return result; }
- // Rate limit: this account is done for this model, try the next one
- if (errType === 'rate_limit_exceeded') {
- recordRateLimited();
- // v2.0.91 — IP-level circuit breaker: when Windsurf upstream
- // rate-limits several accounts for the same model in a tight
- // window, it's usually IP-wide cooldown, not per-account.
- // Burning through all 40+ accounts just marks them all dead
- // for 30 min. Detect and short-circuit.
- if (!context.__rateLimitEvents) context.__rateLimitEvents = [];
- const RL_WINDOW_MS = 8_000;
- const RL_BURST_THRESHOLD = 3;
- context.__rateLimitEvents.push({
- time: Date.now(),
- model: routingModelKey,
- account: acct.id,
- cooldownMs: rateLimitBurstCooldownMs({
- message: result.body?.error?.message || '',
- retryAfterMs: result.body?.error?.retry_after_ms,
- apiKey: acct.apiKey,
- modelKey: routingModelKey,
- }),
- });
- // Prune old events
- const cutoff = Date.now() - RL_WINDOW_MS;
- while (context.__rateLimitEvents.length && context.__rateLimitEvents[0].time < cutoff) {
- context.__rateLimitEvents.shift();
+ try { await ensureLsFn(acct.proxy); } catch (e) {
+ lastErr = isLsPoolExhausted(e) ? lsPoolExhaustedResponse(e) : { status: e.status || 503, body: { error: { message: e.message || String(e), type: e.type || 'ls_unavailable' } } };
+ break;
}
- const sameModelBurst = context.__rateLimitEvents.filter(e => e.model === routingModelKey);
- if (sameModelBurst.length >= RL_BURST_THRESHOLD) {
- const maxCooldown = Math.max(...sameModelBurst.map(e => e.cooldownMs || IP_RATE_LIMIT_BURST_FLOOR_MS));
- log.warn(`Chat[${reqId}]: IP-rate-limit burst detected — ${sameModelBurst.length} accounts rate-limited on ${displayModel} within ${RL_WINDOW_MS}ms. Short-circuiting.`);
- return {
- status: 429,
- headers: { 'Retry-After': String(Math.ceil(maxCooldown / 1000)) },
- body: {
- error: {
- message: `All accounts temporarily rate-limited on ${displayModel}. Windsurf upstream is applying IP-level cooldown. Wait ${formatRetryAfter(maxCooldown)} before retrying, or switch to a different model.`,
- type: 'rate_limit_exceeded',
- retry_after_ms: maxCooldown,
+ const ls = getLsForFn(acct.proxy);
+ if (!ls) { lastErr = { status: 503, body: { error: { message: 'No LS instance available', type: 'ls_unavailable' } } }; break; }
+ // Cascade pins cascade_id to a specific LS port too; if the LS it was
+ // born on has been replaced, the cascade_id is dead.
+ if (reuseEntry && reuseEntry.lsPort !== ls.port) {
+ log.info(`Chat[${reqId}]: reuse MISS — LS port changed`);
+ checkedOutReuseEntry = null;
+ reuseEntry = null;
+ }
+ const _msgChars = (messages || []).reduce((n, m) => {
+ const c = m?.content;
+ return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0);
+ }, 0);
+ log.info(`Chat[${reqId}]: model=${displayModel} flow=${useCascade ? 'cascade' : 'legacy'} attempt=${attempt + 1} ${safeAccountRef(acct)} ls=${ls.port} turns=${(messages || []).length} chars=${_msgChars}${reuseEntry ? ' reuse=1' : ''}${emulateTools ? ' tools=emu' : ''}`);
+ const client = new WindsurfClientClass(acct.apiKey, ls.port, ls.csrfToken);
+ const result = await nonStreamResponse(
+ client, chatId, created, displayModel, routingModelKey, messages, cascadeMessages, modelEnum, modelUid,
+ useCascade, acct.apiKey, ckey,
+ // v2.0.87 (#129) — aliasModelKey is set by the outer wrapper
+ // when this handler is the second pass of an auto-fallback
+ // retry; it carries the ORIGINAL model name the client asked
+ // for so the cascade pool entry gets indexed under both keys.
+ reuseEnabled ? { reuseEntry, lsPort: ls.port, apiKey: acct.apiKey, accountId: acct.id, callerKey, cachePolicy, fpOpts, aliasModelKey: context.__aliasModelKey || null } : null,
+ modelInfo?.provider || null,
+ emulateTools, toolPreamble, wantJson, cachePolicy, wantThinking, tools, body.__route || 'chat',
+ nativeOpts,
+ // v2.0.88 (audit H-3) — when this is the inner-pass of an auto-
+ // fallback retry, the outer wrapper threads the ORIGINAL request's
+ // ckey through context.__originalCkey. We pass it down so the
+ // success-path cacheSet writes under the original key as well —
+ // next identical original-model request hits cache instead of
+ // re-burning the rate-limit + fallback cycle.
+ reqId,
+ context.__originalCkey || null,
+ );
+ if (result.status === 200) return result;
+ reuseEntry = null; // don't try to reuse on the retry
+ if (result.reuseEntryInvalid) reuseEntryDead = true;
+ // #101: same upstream-timeout invalidation as the stream path —
+ // see the matching catch block in streamResponse for the full
+ // rationale (cascade trajectory left half-broken, next reuse hits
+ // it and the model "loses" the prior conversation).
+ const _resultError = result.body?.error || {};
+ const _resultMsg = String(_resultError.upstream_message || _resultError.message || '');
+ if (
+ result.status === 504
+ || _resultError.type === 'upstream_deadline_exceeded'
+ || _resultError.code === 'windsurf_provider_deadline'
+ || isUpstreamDeadlineExceeded(_resultMsg)
+ ) {
+ reuseEntryDead = true;
+ }
+ lastErr = result;
+ const errType = result.body?.error?.type;
+ // v2.0.61 (#113): policy_blocked → don't rotate accounts, return
+ // immediately. The model refused the request, swapping accounts
+ // gives the same refusal but burns more quota.
+ if (errType === 'policy_blocked') { recordPolicyBlocked(); return result; }
+ // Rate limit: this account is done for this model, try the next one
+ if (errType === 'rate_limit_exceeded') {
+ recordRateLimited();
+ // v2.0.91 — IP-level circuit breaker: when Windsurf upstream
+ // rate-limits several accounts for the same model in a tight
+ // window, it's usually IP-wide cooldown, not per-account.
+ // Burning through all 40+ accounts just marks them all dead
+ // for 30 min. Detect and short-circuit.
+ if (!context.__rateLimitEvents) context.__rateLimitEvents = [];
+ const RL_WINDOW_MS = 8_000;
+ const RL_BURST_THRESHOLD = 3;
+ context.__rateLimitEvents.push({
+ time: Date.now(),
+ model: routingModelKey,
+ account: acct.id,
+ cooldownMs: rateLimitBurstCooldownMs({
+ message: result.body?.error?.message || '',
+ retryAfterMs: result.body?.error?.retry_after_ms,
+ apiKey: acct.apiKey,
+ modelKey: routingModelKey,
+ }),
+ });
+ // Prune old events
+ const cutoff = Date.now() - RL_WINDOW_MS;
+ while (context.__rateLimitEvents.length && context.__rateLimitEvents[0].time < cutoff) {
+ context.__rateLimitEvents.shift();
+ }
+ const sameModelBurst = context.__rateLimitEvents.filter(e => e.model === routingModelKey);
+ if (sameModelBurst.length >= RL_BURST_THRESHOLD) {
+ const maxCooldown = Math.max(...sameModelBurst.map(e => e.cooldownMs || IP_RATE_LIMIT_BURST_FLOOR_MS));
+ log.warn(`Chat[${reqId}]: IP-rate-limit burst detected — ${sameModelBurst.length} accounts rate-limited on ${displayModel} within ${RL_WINDOW_MS}ms. Short-circuiting.`);
+ return {
+ status: 429,
+ headers: { 'Retry-After': String(Math.ceil(maxCooldown / 1000)) },
+ body: {
+ error: {
+ message: `All accounts temporarily rate-limited on ${displayModel}. Windsurf upstream is applying IP-level cooldown. Wait ${formatRetryAfter(maxCooldown)} before retrying, or switch to a different model.`,
+ type: 'rate_limit_exceeded',
+ retry_after_ms: maxCooldown,
+ },
},
- },
- };
+ };
+ }
+ if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) {
+ log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) rate-limited on ${displayModel}, stickyNoFallback enabled — not trying other accounts`);
+ break;
+ }
+ log.warn(`Account ${safeAccountRef(acct)} rate-limited on ${displayModel}, trying next account`);
+ continue;
}
- if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) {
- log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) rate-limited on ${displayModel}, stickyNoFallback enabled — not trying other accounts`);
+ // Cascade transient 错误通常是上游或本地 LS 短暂抖动,先退避再切账号,避免连续打爆同一热窗口。
+ if (errType === 'upstream_deadline_exceeded') {
break;
}
- log.warn(`Account ${safeAccountRef(acct)} rate-limited on ${displayModel}, trying next account`);
- continue;
- }
- // Cascade transient 错误通常是上游或本地 LS 短暂抖动,先退避再切账号,避免连续打爆同一热窗口。
- if (errType === 'upstream_deadline_exceeded') {
- break;
- }
- if (errType === 'upstream_internal_error' || errType === 'upstream_transient_error') {
- if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) {
- log.warn(`Chat[${reqId}]: ${safeAccountRef(acct)} (sticky-bound) upstream transient error, stickyNoFallback enabled — not trying other accounts`);
- break;
+ if (errType === 'upstream_internal_error' || errType === 'upstream_transient_error') {
+ if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) {
+ log.warn(`Chat[${reqId}]: ${safeAccountRef(acct)} (sticky-bound) upstream transient error, stickyNoFallback enabled — not trying other accounts`);
+ break;
+ }
+ internalCount++;
+ const backoffMs = await internalErrorBackoff(internalCount - 1);
+ log.warn(`Chat[${reqId}]: ${safeAccountRef(acct)} upstream transient error, waited ${backoffMs}ms before next account`);
+ continue;
}
- internalCount++;
- const backoffMs = await internalErrorBackoff(internalCount - 1);
- log.warn(`Chat[${reqId}]: ${safeAccountRef(acct)} upstream transient error, waited ${backoffMs}ms before next account`);
- continue;
- }
- // Model not available on this account (permission_denied, etc.)
- if (errType === 'model_not_available') {
- if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) {
- log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) cannot serve ${displayModel}, stickyNoFallback enabled — not trying other accounts`);
- break;
+ // Model not available on this account (permission_denied, etc.)
+ if (errType === 'model_not_available') {
+ if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) {
+ log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) cannot serve ${displayModel}, stickyNoFallback enabled — not trying other accounts`);
+ break;
+ }
+ log.warn(`Account ${safeAccountRef(acct)} cannot serve ${displayModel}, trying next account`);
+ continue;
}
- log.warn(`Account ${safeAccountRef(acct)} cannot serve ${displayModel}, trying next account`);
- continue;
- }
- break; // other errors (502, transport) — don't retry
+ break; // other errors (502, transport) — don't retry
} finally {
// Pair every successful getApiKey/acquireAccountByKey with a release
// so the in-flight-count based balancer in auth.js (issue #37) stays
@@ -2728,12 +2747,12 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages,
// WINDSURFAPI_NLU_RETRY=0 to disable globally.
const nluRetryEnabled = process.env.WINDSURFAPI_NLU_RETRY !== '0'
&& (process.env.WINDSURFAPI_NLU_RETRY === '1'
- || /zhipu|glm|moonshot|kimi/i.test(String(provider || ''))
- || /^(?:glm|kimi)/i.test(String(modelKey || '')));
+ || /zhipu|glm|moonshot|kimi/i.test(String(provider || ''))
+ || /^(?:glm|kimi)/i.test(String(modelKey || '')));
if (toolCalls.length === 0
- && nluRetryEnabled
- && Array.isArray(tools) && tools.length > 0
- && narrativeSource) {
+ && nluRetryEnabled
+ && Array.isArray(tools) && tools.length > 0
+ && narrativeSource) {
const lastUser = latestRealUserText(messages) || '';
const intendedTool = detectToolIntentInNarrative(narrativeSource, tools, { lastUserText: lastUser });
if (intendedTool) {
@@ -2746,12 +2765,14 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages,
const correctionMessages = [
...cascadeMessages,
{ role: 'assistant', content: narrativeSource.slice(0, 4000) },
- { role: 'user', content:
- `Your previous response described intending to call \`${intendedTool}\` but didn't emit the tool-call protocol block. ` +
- `Re-emit the call now using the EXACT protocol format defined at the top of this conversation. ` +
- `Do NOT narrate. Do NOT describe. Just the protocol block. ` +
- `Provide a concrete argument value (the literal command / file path / query) — never placeholders like "command" or "the file". ` +
- `\n\n你刚才描述了想用 \`${intendedTool}\` 工具但没按协议格式 emit。请直接重新 emit 协议块,不要 narrate。给具体的 argument 字面值(如 ls / /etc/hostname / "echo hi"),不要写"命令" / "文件" 这种占位词。` },
+ {
+ role: 'user', content:
+ `Your previous response described intending to call \`${intendedTool}\` but didn't emit the tool-call protocol block. ` +
+ `Re-emit the call now using the EXACT protocol format defined at the top of this conversation. ` +
+ `Do NOT narrate. Do NOT describe. Just the protocol block. ` +
+ `Provide a concrete argument value (the literal command / file path / query) — never placeholders like "command" or "the file". ` +
+ `\n\n你刚才描述了想用 \`${intendedTool}\` 工具但没按协议格式 emit。请直接重新 emit 协议块,不要 narrate。给具体的 argument 字面值(如 ls / /etc/hostname / "echo hi"),不要写"命令" / "文件" 这种占位词。`
+ },
];
log.info(`Chat[non-stream]: NLU retry — first pass narrate-only, retrying with correction (tool=${intendedTool} markers=${markers.join(',') || 'none'})`);
const retryChunks = await client.cascadeChat(correctionMessages, modelEnum, modelUid, {
@@ -2930,9 +2951,9 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages,
// v2.0.91 — kimi-k2 upstream outage. Cascade returns idle_empty
// (null content, ~1-6 tokens). Return clear error with alternatives.
if (/^kimi/i.test(String(modelKey || ''))
- && !toolCalls.length
- && (!allText || allText.trim().length === 0)
- && (!allThinking || allThinking.trim().length === 0)) {
+ && !toolCalls.length
+ && (!allText || allText.trim().length === 0)
+ && (!allThinking || allThinking.trim().length === 0)) {
return {
status: 502,
body: {
@@ -2979,7 +3000,7 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages,
const usage = buildUsageBody(serverUsage, messages, allText, allThinking, cachePolicy);
// v2.0.69 (#118): feed bucket totals into stats so dashboard can show
// fresh_input vs cache_read vs cache_write breakdown.
- try { recordTokenUsage(usage); } catch {}
+ try { recordTokenUsage(usage); } catch { }
const finishReason = toolCalls.length ? 'tool_calls' : 'stop';
return {
status: 200,
@@ -3058,10 +3079,12 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages,
if (chars > 500_000) {
return {
status: 413,
- body: { error: {
- message: `请求过大(${Math.round(chars / 1024)}KB 输入)导致语言服务器中断。请尝试:1) 分块发送;2) 先用摘要/summarization 预处理 PDF;3) 减少历史轮数`,
- type: 'payload_too_large',
- } },
+ body: {
+ error: {
+ message: `请求过大(${Math.round(chars / 1024)}KB 输入)导致语言服务器中断。请尝试:1) 分块发送;2) 先用摘要/summarization 预处理 PDF;3) 减少历史轮数`,
+ type: 'payload_too_large',
+ }
+ },
};
}
}
@@ -3082,12 +3105,14 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages,
return {
status: isTransient ? 502 : (err.isModelError ? 403 : 502),
reuseEntryInvalid: !!err.reuseEntryInvalid,
- body: { error: {
- message: isTransient
- ? upstreamTransientErrorMessage(model, 1, isTransport ? 'cascade_transport' : 'internal_error')
- : sanitizeText(err.message),
- type: isTransient ? 'upstream_transient_error' : (err.isModelError ? 'model_not_available' : 'upstream_error'),
- } },
+ body: {
+ error: {
+ message: isTransient
+ ? upstreamTransientErrorMessage(model, 1, isTransport ? 'cascade_transport' : 'internal_error')
+ : sanitizeText(err.message),
+ type: isTransient ? 'upstream_transient_error' : (err.isModelError ? 'model_not_available' : 'upstream_error'),
+ }
+ },
};
}
}
@@ -3132,7 +3157,7 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad
},
async handler(res) {
const abortController = new AbortController();
- let unregisterSse = () => {};
+ let unregisterSse = () => { };
res.on('close', () => {
if (!res.writableEnded) {
log.info('Client disconnected mid-stream, aborting upstream');
@@ -3169,20 +3194,30 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad
log.info(`Chat: cache HIT model=${model} flow=stream`);
recordRequest(model, true, 0, null);
try {
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }]
+ });
if (cached.thinking) {
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: { reasoning_content: cached.thinking }, finish_reason: null }] });
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: { reasoning_content: cached.thinking }, finish_reason: null }]
+ });
}
if (cached.text) {
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: { content: cached.text }, finish_reason: null }] });
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: { content: cached.text }, finish_reason: null }]
+ });
}
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: {}, finish_reason: 'stop' }] });
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [], usage: cachedUsage(messages, cached.text) });
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: {}, finish_reason: 'stop' }]
+ });
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [], usage: cachedUsage(messages, cached.text)
+ });
if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); }
} finally {
unregisterSse();
@@ -3203,11 +3238,11 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad
// every attempt hit it.
let streamInternalCount = 0;
// Dynamic: try every active account in the pool (capped at 10) so a
- // large pool with many rate-limited accounts can still fall through
- // to a free one. Was hardcoded 3 — in pools bigger than 3 with the
- // first accounts rate-limited, healthy accounts were never reached
- // even though they would have worked (issue #5).
- const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length));
+ // large pool with many rate-limited accounts can still fall through
+ // to a free one. Was hardcoded 3 — in pools bigger than 3 with the
+ // first accounts rate-limited, healthy accounts were never reached
+ // even though they would have worked (issue #5).
+ const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length));
// Accumulate chunks so we can cache a successful response at the end.
let accText = '';
@@ -3296,28 +3331,36 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad
// lookahead). On finish we'll emit one clean JSON payload.
if (wantJson) return;
emittedClientPayload = true;
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: { content: clean }, finish_reason: null }] });
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: { content: clean }, finish_reason: null }]
+ });
};
const emitThinking = (clean) => {
if (!clean) return;
accThinking += clean;
emittedClientPayload = true;
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: { reasoning_content: clean }, finish_reason: null }] });
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: { reasoning_content: clean }, finish_reason: null }]
+ });
};
const emitToolCallDelta = (tc, idx) => {
emittedClientPayload = true;
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: {
- tool_calls: [{
- index: idx,
- id: tc.id,
- type: 'function',
- function: { name: tc.name, arguments: sanitizeText(tc.argumentsJson || '{}') },
- }],
- }, finish_reason: null }] });
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{
+ index: 0, delta: {
+ tool_calls: [{
+ index: idx,
+ id: tc.id,
+ type: 'function',
+ function: { name: tc.name, arguments: sanitizeText(tc.argumentsJson || '{}') },
+ }],
+ }, finish_reason: null
+ }]
+ });
};
const emitNativeFallbackCalls = (rawCalls) => {
@@ -3337,8 +3380,10 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad
const onChunk = (chunk) => {
if (!rolePrinted) {
rolePrinted = true;
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }]
+ });
}
hadSuccess = true;
@@ -3353,7 +3398,7 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad
recordNativeBridgeCascadeToolCall(raw.name);
}
if (isCompletedReadUrlNativeResult(raw)
- && !completedNativeReadUrlResults.some(existing => (existing.id || '') === (raw.id || ''))) {
+ && !completedNativeReadUrlResults.some(existing => (existing.id || '') === (raw.id || ''))) {
completedNativeReadUrlResults.push(raw);
}
return;
@@ -3522,8 +3567,8 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad
const reason = tempUnavail.allUnavailable
? `所有可用账号暂时不可用,请 ${Math.ceil(tempUnavail.retryAfterMs / 1000)} 秒后重试`
: rateLimited.allLimited
- ? `所有可用账号均已达速率限制,请 ${Math.ceil(rateLimited.retryAfterMs / 1000)} 秒后重试`
- : `${Math.ceil(QUEUE_MAX_WAIT_MS / 1000)} 秒内没有账号变为可用 — 账号可能被速率限制或对当前模型无权限`;
+ ? `所有可用账号均已达速率限制,请 ${Math.ceil(rateLimited.retryAfterMs / 1000)} 秒后重试`
+ : `${Math.ceil(QUEUE_MAX_WAIT_MS / 1000)} 秒内没有账号变为可用 — 账号可能被速率限制或对当前模型无权限`;
lastErr = Object.assign(
new Error(`${model} 账号队列超时: ${reason}`),
{ type: (tempUnavail.allUnavailable || rateLimited.allLimited) ? 'rate_limit_exceeded' : 'pool_exhausted' }
@@ -3551,428 +3596,438 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad
log.info(`Chat[${reqId}]: native bridge account gate skipped ${safeAccountRef(acct)}`);
continue;
}
- // Pre-flight rate limit check (experimental)
- if (isExperimentalEnabled('preflightRateLimit')) {
- try {
- const px = getEffectiveProxy(acct.id) || null;
- const rl = await checkMessageRateLimitFn(acct.apiKey, px);
- if (!rl.hasCapacity) {
- log.warn(`Preflight: ${safeAccountRef(acct)} has no capacity (remaining=${rl.messagesRemaining}), skipping`);
- refundReservation(acct.apiKey, acct.reservationTimestamp);
- if (Number.isFinite(rl.retryAfterMs) && rl.retryAfterMs > 0) {
- markRateLimited(acct.apiKey, rl.retryAfterMs, modelKey);
- }
- if (!reuseEntryDead && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) {
- const availability = getAccountAvailability(acct.apiKey, modelKey);
- const retryAfterMs = strictReuseRetryMs(availability);
- lastErr = Object.assign(
- new Error(strictReuseMessage(model, retryAfterMs, availability.reason)),
- { type: 'rate_limit_exceeded' }
- );
- log.info(`Chat[${reqId}]: strict reuse preserved cascade after preflight rate limit`);
- break;
+ // Pre-flight rate limit check (experimental)
+ if (isExperimentalEnabled('preflightRateLimit')) {
+ try {
+ const px = getEffectiveProxy(acct.id) || null;
+ const rl = await checkMessageRateLimitFn(acct.apiKey, px);
+ if (!rl.hasCapacity) {
+ log.warn(`Preflight: ${safeAccountRef(acct)} has no capacity (remaining=${rl.messagesRemaining}), skipping`);
+ refundReservation(acct.apiKey, acct.reservationTimestamp);
+ if (Number.isFinite(rl.retryAfterMs) && rl.retryAfterMs > 0) {
+ markRateLimited(acct.apiKey, rl.retryAfterMs, modelKey);
+ }
+ if (!reuseEntryDead && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === acct.apiKey) {
+ const availability = getAccountAvailability(acct.apiKey, modelKey);
+ const retryAfterMs = strictReuseRetryMs(availability);
+ lastErr = Object.assign(
+ new Error(strictReuseMessage(model, retryAfterMs, availability.reason)),
+ { type: 'rate_limit_exceeded' }
+ );
+ log.info(`Chat[${reqId}]: strict reuse preserved cascade after preflight rate limit`);
+ break;
+ }
+ continue;
}
- continue;
+ } catch (e) {
+ log.debug(`Preflight check failed for ${safeAccountRef(acct)}: ${e.message}`);
}
- } catch (e) {
- log.debug(`Preflight check failed for ${safeAccountRef(acct)}: ${e.message}`);
}
- }
- try { await ensureLsFn(acct.proxy); } catch (e) {
- lastErr = isLsPoolExhausted(e)
- ? Object.assign(new Error(e.message), { type: 'ls_pool_exhausted', status: e.status || 503 })
- : e;
- break;
- }
- const ls = getLsForFn(acct.proxy);
- if (!ls) { lastErr = new Error('No LS instance available'); break; }
- if (reuseEntry && reuseEntry.lsPort !== ls.port) {
- log.info(`Chat[${reqId}]: reuse MISS — LS port changed`);
- checkedOutReuseEntry = null;
- reuseEntry = null;
- }
- const _msgCharsStream = (messages || []).reduce((n, m) => {
- const c = m?.content;
- return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0);
- }, 0);
- log.info(`Chat: model=${model} flow=${useCascade ? 'cascade' : 'legacy'} stream=true attempt=${attempt + 1} ${safeAccountRef(acct)} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgCharsStream}${reuseEntry ? ' reuse=1' : ''}`);
- const client = new ClientClass(acct.apiKey, ls.port, ls.csrfToken);
- let cascadeResult = null;
- try {
- if (useCascade) {
- cascadeResult = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, {
- onChunk, signal: abortController.signal, reuseEntry,
- toolPreamble: nativeBridgeOn ? '' : toolPreamble,
- nativeEnvironment: nativeBridgeOn ? (nativeOpts?.environment || '') : '',
- displayModel: model,
- nativeMode: nativeBridgeOn,
- nativeAllowlist: nativeOpts?.allowlist || null,
- additionalSteps: nativeOpts?.additionalSteps || null,
- });
- } else {
- await client.rawGetChatMessage(messages, modelEnum, modelUid, { onChunk });
+ try { await ensureLsFn(acct.proxy); } catch (e) {
+ lastErr = isLsPoolExhausted(e)
+ ? Object.assign(new Error(e.message), { type: 'ls_pool_exhausted', status: e.status || 503 })
+ : e;
+ break;
}
- // Flush order matters:
- // 1. NativeFunctionCallStreamParser tail → may produce provider
- // native tool_calls withheld from content deltas
- // 2. ToolCallStreamParser tail → may produce more text deltas
- // (e.g., a dangling that never closed falls
- // through as literal text)
- // 3. PathSanitizeStream tail (text) → scrubs anything the tool
- // parser held back AND anything we were holding ourselves
- // 4. PathSanitizeStream tail (thinking)
- if (nativeFunctionParser) {
- const nativeTail = nativeFunctionParser.flush();
- const emitted = emitNativeFallbackCalls(nativeTail.toolCalls);
- if (emitted) {
- log.info(`Chat[stream]: native bridge parsed ${emitted} provider-native function_call(s) from stream tail`);
- }
- if (nativeTail.text) emitContent(pathStreamText.feed(nativeTail.text));
+ const ls = getLsForFn(acct.proxy);
+ if (!ls) { lastErr = new Error('No LS instance available'); break; }
+ if (reuseEntry && reuseEntry.lsPort !== ls.port) {
+ log.info(`Chat[${reqId}]: reuse MISS — LS port changed`);
+ checkedOutReuseEntry = null;
+ reuseEntry = null;
}
- if (toolParser) {
- const tail = toolParser.flush();
- if (tail.text) emitContent(pathStreamText.feed(tail.text));
- // M2 allowlist on the tail flush as well — stream end can
- // still emit tail tool_calls and they need the same filter.
- const filteredTail = emulateTools
- ? filterToolCallsByAllowlist(tail.toolCalls, declaredTools)
- : [];
- for (const rawTc of filteredTail) {
- const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages));
- const idx = collectedToolCalls.length;
- collectedToolCalls.push(tc);
- bridgeDiag.emulatedToolCalls++;
- bridgeDiag.emulatedNames.push(tc.name);
- emitToolCallDelta(tc, idx);
+ const _msgCharsStream = (messages || []).reduce((n, m) => {
+ const c = m?.content;
+ return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0);
+ }, 0);
+ log.info(`Chat: model=${model} flow=${useCascade ? 'cascade' : 'legacy'} stream=true attempt=${attempt + 1} ${safeAccountRef(acct)} ls=${ls.port} turns=${(messages || []).length} chars=${_msgCharsStream}${reuseEntry ? ' reuse=1' : ''}`);
+ const client = new ClientClass(acct.apiKey, ls.port, ls.csrfToken);
+ let cascadeResult = null;
+ try {
+ if (useCascade) {
+ cascadeResult = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, {
+ onChunk, signal: abortController.signal, reuseEntry,
+ toolPreamble: nativeBridgeOn ? '' : toolPreamble,
+ nativeEnvironment: nativeBridgeOn ? (nativeOpts?.environment || '') : '',
+ displayModel: model,
+ nativeMode: nativeBridgeOn,
+ nativeAllowlist: nativeOpts?.allowlist || null,
+ additionalSteps: nativeOpts?.additionalSteps || null,
+ });
+ } else {
+ await client.rawGetChatMessage(messages, modelEnum, modelUid, { onChunk });
+ }
+ // Flush order matters:
+ // 1. NativeFunctionCallStreamParser tail → may produce provider
+ // native tool_calls withheld from content deltas
+ // 2. ToolCallStreamParser tail → may produce more text deltas
+ // (e.g., a dangling that never closed falls
+ // through as literal text)
+ // 3. PathSanitizeStream tail (text) → scrubs anything the tool
+ // parser held back AND anything we were holding ourselves
+ // 4. PathSanitizeStream tail (thinking)
+ if (nativeFunctionParser) {
+ const nativeTail = nativeFunctionParser.flush();
+ const emitted = emitNativeFallbackCalls(nativeTail.toolCalls);
+ if (emitted) {
+ log.info(`Chat[stream]: native bridge parsed ${emitted} provider-native function_call(s) from stream tail`);
+ }
+ if (nativeTail.text) emitContent(pathStreamText.feed(nativeTail.text));
}
- // Diagnostic: same as nonStreamResponse but for the SSE path —
- // surface why no tool_calls came out when emulation was active.
- // See nonStreamResponse for marker rationale (#109 sub2api E2E).
- // v2.0.72 fix: see non-stream comment — combine accText +
- // accThinking for marker / NLU detection so models that
- // route narrate output through reasoning_content (GLM-4.7,
- // some Claude models in thinking mode) don't slip past.
- const accNarrative = (accText && accText.trim()) ? accText : accThinking;
- if (emulateTools && collectedToolCalls.length === 0 && accNarrative) {
- const head = accNarrative.slice(0, 240).replace(/\s+/g, ' ');
- const markers = [];
- if (/ markup, extract + emit as
- // tool_call delta so client agent loop doesn't break.
- //
- // v2.0.76 (#120 follow-up): widened to fire even when
- // markers were detected but parser produced 0 calls
- // (mirrors the non-stream path).
- if (declaredTools.length > 0) {
- const lastUser = latestRealUserText(messages) || '';
- const recovered = extractIntentFromNarrative(accNarrative, declaredTools, { lastUserText: lastUser, markers });
- if (recovered.length) {
- const recoveredCalls = recovered.map((r, i) => ({
- id: `nlu_${i}_${Date.now().toString(36)}`,
- name: r.name,
- argumentsJson: r.argumentsJson,
- }));
- const filtered = filterToolCallsByAllowlist(recoveredCalls, declaredTools);
- for (const rawTc of filtered) {
- const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages));
- const idx = collectedToolCalls.length;
- collectedToolCalls.push(tc);
- bridgeDiag.emulatedToolCalls++;
- bridgeDiag.emulatedNames.push(tc.name);
- emitToolCallDelta(tc, idx);
+ if (toolParser) {
+ const tail = toolParser.flush();
+ if (tail.text) emitContent(pathStreamText.feed(tail.text));
+ // M2 allowlist on the tail flush as well — stream end can
+ // still emit tail tool_calls and they need the same filter.
+ const filteredTail = emulateTools
+ ? filterToolCallsByAllowlist(tail.toolCalls, declaredTools)
+ : [];
+ for (const rawTc of filteredTail) {
+ const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages));
+ const idx = collectedToolCalls.length;
+ collectedToolCalls.push(tc);
+ bridgeDiag.emulatedToolCalls++;
+ bridgeDiag.emulatedNames.push(tc.name);
+ emitToolCallDelta(tc, idx);
+ }
+ // Diagnostic: same as nonStreamResponse but for the SSE path —
+ // surface why no tool_calls came out when emulation was active.
+ // See nonStreamResponse for marker rationale (#109 sub2api E2E).
+ // v2.0.72 fix: see non-stream comment — combine accText +
+ // accThinking for marker / NLU detection so models that
+ // route narrate output through reasoning_content (GLM-4.7,
+ // some Claude models in thinking mode) don't slip past.
+ const accNarrative = (accText && accText.trim()) ? accText : accThinking;
+ if (emulateTools && collectedToolCalls.length === 0 && accNarrative) {
+ const head = accNarrative.slice(0, 240).replace(/\s+/g, ' ');
+ const markers = [];
+ if (/ markup, extract + emit as
+ // tool_call delta so client agent loop doesn't break.
+ //
+ // v2.0.76 (#120 follow-up): widened to fire even when
+ // markers were detected but parser produced 0 calls
+ // (mirrors the non-stream path).
+ if (declaredTools.length > 0) {
+ const lastUser = latestRealUserText(messages) || '';
+ const recovered = extractIntentFromNarrative(accNarrative, declaredTools, { lastUserText: lastUser, markers });
+ if (recovered.length) {
+ const recoveredCalls = recovered.map((r, i) => ({
+ id: `nlu_${i}_${Date.now().toString(36)}`,
+ name: r.name,
+ argumentsJson: r.argumentsJson,
+ }));
+ const filtered = filterToolCallsByAllowlist(recoveredCalls, declaredTools);
+ for (const rawTc of filtered) {
+ const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages));
+ const idx = collectedToolCalls.length;
+ collectedToolCalls.push(tc);
+ bridgeDiag.emulatedToolCalls++;
+ bridgeDiag.emulatedNames.push(tc.name);
+ emitToolCallDelta(tc, idx);
+ }
+ if (filtered.length) {
+ log.info(`Chat[stream]: NLU recovery — promoted ${filtered.length} narrative tool_call(s) mid-stream (markers=${markers.join(',') || 'none'})`);
+ }
}
- if (filtered.length) {
- log.info(`Chat[stream]: NLU recovery — promoted ${filtered.length} narrative tool_call(s) mid-stream (markers=${markers.join(',') || 'none'})`);
+ }
+ // v2.0.71 (#115) — fabricate detection on stream tail
+ // (only if NLU didn't recover anything).
+ if (markers.length === 0 && collectedToolCalls.length === 0) {
+ const lastUser = latestRealUserText(messages) || '';
+ const fab = detectFabricatedToolResult(accNarrative, { lastUserText: lastUser });
+ if (fab) {
+ log.warn(`Chat[stream]: fabricate detected — model=${modelKey} pattern=${fab.matchedPattern} sample="${fab.sample}"`);
}
}
}
- // v2.0.71 (#115) — fabricate detection on stream tail
- // (only if NLU didn't recover anything).
- if (markers.length === 0 && collectedToolCalls.length === 0) {
- const lastUser = latestRealUserText(messages) || '';
- const fab = detectFabricatedToolResult(accNarrative, { lastUserText: lastUser });
- if (fab) {
- log.warn(`Chat[stream]: fabricate detected — model=${modelKey} pattern=${fab.matchedPattern} sample="${fab.sample}"`);
+ }
+ emitContent(pathStreamText.flush());
+ emitThinking(pathStreamThinking.flush());
+
+ // v2.0.65 native bridge: cascade trajectory steps come back on
+ // cascadeResult.toolCalls with cascade_native:true. Translate
+ // each into the caller's OpenAI tool name + reverse-mapped
+ // args, allowlist-filter, then emit as tool_call deltas. We do
+ // this at the tail (after pathStreamText flush) rather than
+ // mid-stream because cascadeChat doesn't expose per-step
+ // callbacks for native steps yet — clients see one batched
+ // tool_calls turn instead of fully-streamed deltas. That's a
+ // known gap; trades streaming-grain for shipping a working
+ // bridge first.
+ // v2.0.70 — onChunk now emits cascade native tool_calls
+ // mid-stream (see "Cascade native trajectory tool_call
+ // streamed live" branch above). The batch path here only
+ // catches the tail case where collectedToolCalls is still
+ // empty after stream end (e.g. final-sweep step came late);
+ // dedupe by id so we never emit a tool_call twice.
+ if (nativeBridgeOn && cascadeResult?.toolCalls?.length && collectedToolCalls.length === 0) {
+ const lookup = nativeOpts?.callerLookup || new Map();
+ const nativeRaw = [];
+ for (const raw of cascadeResult.toolCalls) {
+ if (!raw?.cascade_native) continue;
+ if (markBridgeDiagCascadeRaw(raw)) {
+ recordNativeBridgeCascadeToolCall(raw.name);
+ }
+ if (isCompletedReadUrlNativeResult(raw)) {
+ if (!completedNativeReadUrlResults.some(existing => (existing.id || '') === (raw.id || ''))) {
+ completedNativeReadUrlResults.push(raw);
+ }
+ continue;
}
+ const candidates = lookup.get(raw.name) || [];
+ const callerName = candidates[0];
+ if (!callerName) {
+ bridgeDiag.unmappedToolCalls++;
+ bridgeDiag.unmappedKinds.push(raw.name);
+ recordNativeBridgeUnmappedCascadeToolCall(raw.name);
+ continue;
+ }
+ const reverseFn = TOOL_MAP[callerName]?.reverse;
+ let cascadeArgs;
+ try { cascadeArgs = JSON.parse(raw.argumentsJson || '{}'); } catch { bridgeDiag.argParseFailures++; cascadeArgs = {}; }
+ let openaiArgs;
+ try { openaiArgs = reverseFn ? reverseFn(cascadeArgs) : cascadeArgs; }
+ catch { bridgeDiag.reverseFailures++; openaiArgs = cascadeArgs; }
+ bridgeDiag.mappedToolCalls++;
+ bridgeDiag.mappedNames.push(callerName);
+ nativeRaw.push({
+ id: raw.id || `call_${nativeRaw.length}_${Date.now().toString(36)}`,
+ name: callerName,
+ argumentsJson: JSON.stringify(openaiArgs ?? {}),
+ });
}
- }
- }
- emitContent(pathStreamText.flush());
- emitThinking(pathStreamThinking.flush());
-
- // v2.0.65 native bridge: cascade trajectory steps come back on
- // cascadeResult.toolCalls with cascade_native:true. Translate
- // each into the caller's OpenAI tool name + reverse-mapped
- // args, allowlist-filter, then emit as tool_call deltas. We do
- // this at the tail (after pathStreamText flush) rather than
- // mid-stream because cascadeChat doesn't expose per-step
- // callbacks for native steps yet — clients see one batched
- // tool_calls turn instead of fully-streamed deltas. That's a
- // known gap; trades streaming-grain for shipping a working
- // bridge first.
- // v2.0.70 — onChunk now emits cascade native tool_calls
- // mid-stream (see "Cascade native trajectory tool_call
- // streamed live" branch above). The batch path here only
- // catches the tail case where collectedToolCalls is still
- // empty after stream end (e.g. final-sweep step came late);
- // dedupe by id so we never emit a tool_call twice.
- if (nativeBridgeOn && cascadeResult?.toolCalls?.length && collectedToolCalls.length === 0) {
- const lookup = nativeOpts?.callerLookup || new Map();
- const nativeRaw = [];
- for (const raw of cascadeResult.toolCalls) {
- if (!raw?.cascade_native) continue;
- if (markBridgeDiagCascadeRaw(raw)) {
- recordNativeBridgeCascadeToolCall(raw.name);
+ const filteredNative = filterToolCallsByAllowlist(nativeRaw, declaredTools);
+ for (const rawTc of filteredNative) {
+ const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages));
+ const idx = collectedToolCalls.length;
+ collectedToolCalls.push(tc);
+ recordNativeBridgeEmittedToolCall(tc.name, { source: 'cascade' });
+ emitToolCallDelta(tc, idx);
}
- if (isCompletedReadUrlNativeResult(raw)) {
- if (!completedNativeReadUrlResults.some(existing => (existing.id || '') === (raw.id || ''))) {
- completedNativeReadUrlResults.push(raw);
- }
- continue;
+ if (filteredNative.length === 0 && completedNativeReadUrlResults.length === 0 && cascadeResult.toolCalls.some(tc => tc.cascade_native)) {
+ log.info(`Chat[stream]: nativeBridge=true received cascade tool calls but none mapped to caller tools (kinds=${cascadeResult.toolCalls.filter(tc => tc.cascade_native).map(tc => tc.name).join(',')})`);
}
- const candidates = lookup.get(raw.name) || [];
- const callerName = candidates[0];
- if (!callerName) {
- bridgeDiag.unmappedToolCalls++;
- bridgeDiag.unmappedKinds.push(raw.name);
- recordNativeBridgeUnmappedCascadeToolCall(raw.name);
- continue;
+ }
+ if (accText.length === 0 && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length) {
+ const fallbackText = completedNativeReadUrlResults.map(raw => raw.result).filter(Boolean).join('\n');
+ if (fallbackText) emitContent(pathStreamText.feed(fallbackText));
+ emitContent(pathStreamText.flush());
+ }
+ if (nativeBridgeOn && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length === 0) recordNativeBridgeNoToolCallResponse();
+ bridgeDiag.totalToolCalls = collectedToolCalls.length;
+ bridgeDiag.noToolCalls = bridgeDiag.requestedTools && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length === 0;
+ logBridgeResultDiagnostics(reqId, bridgeDiag);
+ // Pool check-in on success (cascade only)
+ if (reuseEnabled && cascadeResult?.cascadeId && (accText || collectedToolCalls.length)) {
+ const turnComplete = appendAssistantTurn(messages, accText, collectedToolCalls);
+ const fpAfter = fingerprintAfter(turnComplete, modelKey, callerKey, fpOpts);
+ const ttlHint = ttlHintFromCachePolicy(cachePolicy);
+ poolCheckin(fpAfter, {
+ cascadeId: cascadeResult.cascadeId,
+ sessionId: cascadeResult.sessionId,
+ lsPort: ls.port,
+ lsGeneration: cascadeResult.lsGeneration || ls.generation,
+ apiKey: currentApiKey,
+ modelKey: modelKey || '',
+ stepOffset: Number.isFinite(cascadeResult.stepOffset) ? cascadeResult.stepOffset : reuseEntry?.stepOffset,
+ generatorOffset: Number.isFinite(cascadeResult.generatorOffset) ? cascadeResult.generatorOffset : reuseEntry?.generatorOffset,
+ historyCoverage: cascadeResult.historyCoverage || reuseEntry?.historyCoverage || null,
+ createdAt: reuseEntry?.createdAt,
+ }, callerKey, ttlHint === undefined ? 0 : ttlHint);
+
+ // Bind caller to this account for the next turn
+ if (callerKey && isStickyEnabled() && acct) {
+ setStickyBinding(callerKey, modelKey, acct.id, acct.apiKey);
}
- const reverseFn = TOOL_MAP[callerName]?.reverse;
- let cascadeArgs;
- try { cascadeArgs = JSON.parse(raw.argumentsJson || '{}'); } catch { bridgeDiag.argParseFailures++; cascadeArgs = {}; }
- let openaiArgs;
- try { openaiArgs = reverseFn ? reverseFn(cascadeArgs) : cascadeArgs; }
- catch { bridgeDiag.reverseFailures++; openaiArgs = cascadeArgs; }
- bridgeDiag.mappedToolCalls++;
- bridgeDiag.mappedNames.push(callerName);
- nativeRaw.push({
- id: raw.id || `call_${nativeRaw.length}_${Date.now().toString(36)}`,
- name: callerName,
- argumentsJson: JSON.stringify(openaiArgs ?? {}),
+ }
+ // success
+ if (hadSuccess) reportSuccess(currentApiKey);
+ updateCapability(currentApiKey, modelKey, true, 'success');
+ recordRequest(model, true, Date.now() - startTime, currentApiKey);
+ if (!rolePrinted) {
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }]
});
}
- const filteredNative = filterToolCallsByAllowlist(nativeRaw, declaredTools);
- for (const rawTc of filteredNative) {
- const tc = sanitizeToolCall(repairToolCallArguments(rawTc, messages));
- const idx = collectedToolCalls.length;
- collectedToolCalls.push(tc);
- recordNativeBridgeEmittedToolCall(tc.name, { source: 'cascade' });
- emitToolCallDelta(tc, idx);
+ // For response_format=json_* we buffered all content — flush one
+ // clean JSON payload now. extractJsonPayload strips fences and
+ // any preamble text, returning raw parseable JSON (or the
+ // trimmed original when nothing parses).
+ if (wantJson && accText) {
+ const cleaned = stabilizeJsonPayload(accText, messages);
+ if (cleaned) {
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: { content: cleaned }, finish_reason: null }]
+ });
+ accText = cleaned;
+ }
}
- if (filteredNative.length === 0 && completedNativeReadUrlResults.length === 0 && cascadeResult.toolCalls.some(tc => tc.cascade_native)) {
- log.info(`Chat[stream]: nativeBridge=true received cascade tool calls but none mapped to caller tools (kinds=${cascadeResult.toolCalls.filter(tc => tc.cascade_native).map(tc => tc.name).join(',')})`);
+ // GLM5.1 silence fallback (#86 follow-up KLFDan0534) — see
+ // shouldFallbackThinkingToText comment for rationale.
+ // Inside streamResponse the routing key arrives as the
+ // `modelKey` param (caller passes routingModelKey there);
+ // wantThinking comes through deps because body isn't in
+ // scope here (#93 follow-up zhangzhang-bit).
+ if (shouldFallbackThinkingToText({
+ routingModelKey: modelKey,
+ wantThinking: deps.wantThinking,
+ accText,
+ accThinking,
+ hasToolCalls: collectedToolCalls.length > 0,
+ })) {
+ log.info(`Chat[${reqId}]: thinking-only stream from non-reasoning model ${modelKey}; promoting ${accThinking.length}c thinking → content`);
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: { content: accThinking }, finish_reason: null }]
+ });
+ accText = accThinking;
+ accThinking = '';
}
- }
- if (accText.length === 0 && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length) {
- const fallbackText = completedNativeReadUrlResults.map(raw => raw.result).filter(Boolean).join('\n');
- if (fallbackText) emitContent(pathStreamText.feed(fallbackText));
- emitContent(pathStreamText.flush());
- }
- if (nativeBridgeOn && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length === 0) recordNativeBridgeNoToolCallResponse();
- bridgeDiag.totalToolCalls = collectedToolCalls.length;
- bridgeDiag.noToolCalls = bridgeDiag.requestedTools && collectedToolCalls.length === 0 && completedNativeReadUrlResults.length === 0;
- logBridgeResultDiagnostics(reqId, bridgeDiag);
- // Pool check-in on success (cascade only)
- if (reuseEnabled && cascadeResult?.cascadeId && (accText || collectedToolCalls.length)) {
- const turnComplete = appendAssistantTurn(messages, accText, collectedToolCalls);
- const fpAfter = fingerprintAfter(turnComplete, modelKey, callerKey, fpOpts);
- const ttlHint = ttlHintFromCachePolicy(cachePolicy);
- poolCheckin(fpAfter, {
- cascadeId: cascadeResult.cascadeId,
- sessionId: cascadeResult.sessionId,
- lsPort: ls.port,
- lsGeneration: cascadeResult.lsGeneration || ls.generation,
- apiKey: currentApiKey,
- modelKey: modelKey || '',
- stepOffset: Number.isFinite(cascadeResult.stepOffset) ? cascadeResult.stepOffset : reuseEntry?.stepOffset,
- generatorOffset: Number.isFinite(cascadeResult.generatorOffset) ? cascadeResult.generatorOffset : reuseEntry?.generatorOffset,
- historyCoverage: cascadeResult.historyCoverage || reuseEntry?.historyCoverage || null,
- createdAt: reuseEntry?.createdAt,
- }, callerKey, ttlHint === undefined ? 0 : ttlHint);
-
- // Bind caller to this account for the next turn
- if (callerKey && isStickyEnabled() && acct) {
- setStickyBinding(callerKey, modelKey, acct.id, acct.apiKey);
+ const finalReason = collectedToolCalls.length ? 'tool_calls' : 'stop';
+ // OpenAI spec: the finish_reason chunk carries NO usage, then a
+ // separate terminal chunk has empty choices[] + usage
+ // (stream_options.include_usage convention). Emitting usage on
+ // both made some clients double-count billing. Drop the first.
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [{ index: 0, delta: {}, finish_reason: finalReason }]
+ });
+ {
+ const usage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking, cachePolicy);
+ try { recordTokenUsage(usage); } catch { }
+ send({
+ id, object: 'chat.completion.chunk', created, model,
+ choices: [], usage
+ });
}
- }
- // success
- if (hadSuccess) reportSuccess(currentApiKey);
- updateCapability(currentApiKey, modelKey, true, 'success');
- recordRequest(model, true, Date.now() - startTime, currentApiKey);
- if (!rolePrinted) {
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
- }
- // For response_format=json_* we buffered all content — flush one
- // clean JSON payload now. extractJsonPayload strips fences and
- // any preamble text, returning raw parseable JSON (or the
- // trimmed original when nothing parses).
- if (wantJson && accText) {
- const cleaned = stabilizeJsonPayload(accText, messages);
- if (cleaned) {
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: { content: cleaned }, finish_reason: null }] });
- accText = cleaned;
+ if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); }
+ if (ckey && !collectedToolCalls.length && (accText || accThinking)) {
+ cacheSet(ckey, { text: accText, thinking: accThinking });
}
- }
- // GLM5.1 silence fallback (#86 follow-up KLFDan0534) — see
- // shouldFallbackThinkingToText comment for rationale.
- // Inside streamResponse the routing key arrives as the
- // `modelKey` param (caller passes routingModelKey there);
- // wantThinking comes through deps because body isn't in
- // scope here (#93 follow-up zhangzhang-bit).
- if (shouldFallbackThinkingToText({
- routingModelKey: modelKey,
- wantThinking: deps.wantThinking,
- accText,
- accThinking,
- hasToolCalls: collectedToolCalls.length > 0,
- })) {
- log.info(`Chat[${reqId}]: thinking-only stream from non-reasoning model ${modelKey}; promoting ${accThinking.length}c thinking → content`);
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: { content: accThinking }, finish_reason: null }] });
- accText = accThinking;
- accThinking = '';
- }
- const finalReason = collectedToolCalls.length ? 'tool_calls' : 'stop';
- // OpenAI spec: the finish_reason chunk carries NO usage, then a
- // separate terminal chunk has empty choices[] + usage
- // (stream_options.include_usage convention). Emitting usage on
- // both made some clients double-count billing. Drop the first.
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [{ index: 0, delta: {}, finish_reason: finalReason }] });
- {
- const usage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking, cachePolicy);
- try { recordTokenUsage(usage); } catch {}
- send({ id, object: 'chat.completion.chunk', created, model,
- choices: [], usage });
- }
- if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); }
- if (ckey && !collectedToolCalls.length && (accText || accThinking)) {
- cacheSet(ckey, { text: accText, thinking: accThinking });
- }
- return;
- } catch (err) {
- lastErr = err;
- reuseEntry = null; // don't try to reuse on retry
- // v2.0.25 HIGH-2: client.js marks the error when it tried to
- // recover from a "cascade not found" but couldn't. The entry
- // we held is dead — never restore it on the way out.
- if (err.reuseEntryInvalid) reuseEntryDead = true;
- // #101 (nalayahfowlkest-ship-it): when the upstream model
- // provider times out mid-stream ("context deadline exceeded"
- // / "Client.Timeout or context cancellation while reading
- // body"), the cascade trajectory is left in an inconsistent
- // state — the assistant never finished, but the prior
- // tool_result is still in there. Restoring this cascade to
- // the pool causes the NEXT request to reuse a half-broken
- // trajectory, and the model only sees the trailing tool
- // result with no earlier user prompts ("I can see the
- // content from a previous tool call ... but I don't have
- // the earlier conversation context").
- const isDeadline = isUpstreamDeadlineExceeded(err);
- if (isDeadline) {
- reuseEntryDead = true;
- }
- const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message);
- const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message);
- const isInternal = /internal error occurred.*error id/i.test(err.message);
- const isTransport = isCascadeTransportError(err);
- const isTransient = !isDeadline && isUpstreamTransientError(err, isInternal);
- // v2.0.61 (#113) — same policy detection as nonStreamResponse.
- const isPolicyBlocked = /cyber\s*verification|content[\s_-]+policy|policy[\s_-]+(?:violation|blocked|denied)|safety[\s_-]+(?:policy|blocked)|prompt[\s_-]+(?:rejected|blocked)\s+by[\s_-]+policy|usage[\s_-]+policy[\s_-]+violation/i.test(err.message);
- if (isAuthFail) reportError(currentApiKey);
- if (isRateLimit) { recordRateLimited(); markRateLimited(currentApiKey, rateLimitCooldownMs(err.message), modelKey); err.isRateLimit = true; err.isModelError = true; err.kind ||= 'model_error'; }
- // v2.0.91 — IP-level rate limit circuit breaker (stream path).
- // Same logic as non-stream: ≥3 accounts rate-limited for the
- // same model within 8s → Windsurf is doing IP-wide cooldown,
- // stop burning accounts and surface immediately.
- const ctx = deps.context || {};
- if (isRateLimit && !ctx.__rlAborted) {
- if (!ctx.__rateLimitEvents) ctx.__rateLimitEvents = [];
- const RL_WINDOW_MS = 8_000;
- const RL_BURST_THRESHOLD = 3;
- const now = Date.now();
- ctx.__rateLimitEvents.push({
- time: now,
- model: modelKey,
- account: acct?.id,
- cooldownMs: rateLimitBurstCooldownMs({
- message: err.message || '',
- retryAfterMs: err.retry_after_ms,
- apiKey: currentApiKey,
- modelKey,
- }),
- });
- const cutoff = now - RL_WINDOW_MS;
- while (ctx.__rateLimitEvents.length && ctx.__rateLimitEvents[0].time < cutoff) {
- ctx.__rateLimitEvents.shift();
+ return;
+ } catch (err) {
+ lastErr = err;
+ reuseEntry = null; // don't try to reuse on retry
+ // v2.0.25 HIGH-2: client.js marks the error when it tried to
+ // recover from a "cascade not found" but couldn't. The entry
+ // we held is dead — never restore it on the way out.
+ if (err.reuseEntryInvalid) reuseEntryDead = true;
+ // #101 (nalayahfowlkest-ship-it): when the upstream model
+ // provider times out mid-stream ("context deadline exceeded"
+ // / "Client.Timeout or context cancellation while reading
+ // body"), the cascade trajectory is left in an inconsistent
+ // state — the assistant never finished, but the prior
+ // tool_result is still in there. Restoring this cascade to
+ // the pool causes the NEXT request to reuse a half-broken
+ // trajectory, and the model only sees the trailing tool
+ // result with no earlier user prompts ("I can see the
+ // content from a previous tool call ... but I don't have
+ // the earlier conversation context").
+ const isDeadline = isUpstreamDeadlineExceeded(err);
+ if (isDeadline) {
+ reuseEntryDead = true;
}
- const sameModelBurst = ctx.__rateLimitEvents.filter(e => e.model === modelKey);
- if (sameModelBurst.length >= RL_BURST_THRESHOLD) {
- ctx.__rlAborted = true;
- log.warn(`Chat[${reqId}] stream: IP-rate-limit burst — ${sameModelBurst.length} accounts rate-limited on ${model} within ${RL_WINDOW_MS}ms. Short-circuiting.`);
- const cooldown = Math.max(...sameModelBurst.map(e => e.cooldownMs || IP_RATE_LIMIT_BURST_FLOOR_MS));
- lastErr = Object.assign(new Error(`All accounts temporarily rate-limited on ${model}. Windsurf upstream is applying IP-level cooldown. Wait ~${formatRetryAfter(cooldown)} before retrying.`), { type: 'rate_limit_exceeded', retry_after_ms: cooldown });
+ const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message);
+ const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message);
+ const isInternal = /internal error occurred.*error id/i.test(err.message);
+ const isTransport = isCascadeTransportError(err);
+ const isTransient = !isDeadline && isUpstreamTransientError(err, isInternal);
+ // v2.0.61 (#113) — same policy detection as nonStreamResponse.
+ const isPolicyBlocked = /cyber\s*verification|content[\s_-]+policy|policy[\s_-]+(?:violation|blocked|denied)|safety[\s_-]+(?:policy|blocked)|prompt[\s_-]+(?:rejected|blocked)\s+by[\s_-]+policy|usage[\s_-]+policy[\s_-]+violation/i.test(err.message);
+ if (isAuthFail) reportError(currentApiKey);
+ if (isRateLimit) { recordRateLimited(); markRateLimited(currentApiKey, rateLimitCooldownMs(err.message), modelKey); err.isRateLimit = true; err.isModelError = true; err.kind ||= 'model_error'; }
+ // v2.0.91 — IP-level rate limit circuit breaker (stream path).
+ // Same logic as non-stream: ≥3 accounts rate-limited for the
+ // same model within 8s → Windsurf is doing IP-wide cooldown,
+ // stop burning accounts and surface immediately.
+ const ctx = deps.context || {};
+ if (isRateLimit && !ctx.__rlAborted) {
+ if (!ctx.__rateLimitEvents) ctx.__rateLimitEvents = [];
+ const RL_WINDOW_MS = 8_000;
+ const RL_BURST_THRESHOLD = 3;
+ const now = Date.now();
+ ctx.__rateLimitEvents.push({
+ time: now,
+ model: modelKey,
+ account: acct?.id,
+ cooldownMs: rateLimitBurstCooldownMs({
+ message: err.message || '',
+ retryAfterMs: err.retry_after_ms,
+ apiKey: currentApiKey,
+ modelKey,
+ }),
+ });
+ const cutoff = now - RL_WINDOW_MS;
+ while (ctx.__rateLimitEvents.length && ctx.__rateLimitEvents[0].time < cutoff) {
+ ctx.__rateLimitEvents.shift();
+ }
+ const sameModelBurst = ctx.__rateLimitEvents.filter(e => e.model === modelKey);
+ if (sameModelBurst.length >= RL_BURST_THRESHOLD) {
+ ctx.__rlAborted = true;
+ log.warn(`Chat[${reqId}] stream: IP-rate-limit burst — ${sameModelBurst.length} accounts rate-limited on ${model} within ${RL_WINDOW_MS}ms. Short-circuiting.`);
+ const cooldown = Math.max(...sameModelBurst.map(e => e.cooldownMs || IP_RATE_LIMIT_BURST_FLOOR_MS));
+ lastErr = Object.assign(new Error(`All accounts temporarily rate-limited on ${model}. Windsurf upstream is applying IP-level cooldown. Wait ~${formatRetryAfter(cooldown)} before retrying.`), { type: 'rate_limit_exceeded', retry_after_ms: cooldown });
+ break;
+ }
+ }
+ if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; err.kind ||= 'transient_stall'; }
+ if (isPolicyBlocked) { recordPolicyBlocked(); err.isPolicyBlocked = true; err.isModelError = true; err.kind = 'policy_blocked'; }
+ if (isTransport) { err.isModelError = true; err.kind ||= 'transient_stall'; }
+ // v2.0.56 stream-path ban detection — same 2-strike logic as
+ // non-stream. See nonStreamResponse for rationale.
+ if (!isRateLimit && looksLikeBanSignal(err.message)) {
+ reportBanSignal(currentApiKey, err.message);
+ err.isModelError = true; err.kind ||= 'auth_error';
+ }
+ if (err.isModelError && err.kind !== 'transient_stall' && !isRateLimit && !isInternal) {
+ updateCapability(currentApiKey, modelKey, false, 'model_error');
+ }
+ if (isRateLimit && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === currentApiKey) {
+ log.info(`Chat[${reqId}]: strict reuse preserved cascade after rate limit`);
break;
}
- }
- if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; err.kind ||= 'transient_stall'; }
- if (isPolicyBlocked) { recordPolicyBlocked(); err.isPolicyBlocked = true; err.isModelError = true; err.kind = 'policy_blocked'; }
- if (isTransport) { err.isModelError = true; err.kind ||= 'transient_stall'; }
- // v2.0.56 stream-path ban detection — same 2-strike logic as
- // non-stream. See nonStreamResponse for rationale.
- if (!isRateLimit && looksLikeBanSignal(err.message)) {
- reportBanSignal(currentApiKey, err.message);
- err.isModelError = true; err.kind ||= 'auth_error';
- }
- if (err.isModelError && err.kind !== 'transient_stall' && !isRateLimit && !isInternal) {
- updateCapability(currentApiKey, modelKey, false, 'model_error');
- }
- if (isRateLimit && strictReuse && checkedOutReuseEntry && fpBefore && checkedOutReuseEntry.apiKey === currentApiKey) {
- log.info(`Chat[${reqId}]: strict reuse preserved cascade after rate limit`);
- break;
- }
- // v2.0.61 (#113): policy refusal isn't account-bound, drop
- // out of the retry loop immediately and let the SSE error
- // path emit a 451-style chunk to the client.
- if (isPolicyBlocked) {
- log.warn(`Chat[${reqId}] stream: policy_blocked on ${safeKeyRef(currentApiKey, 'apiKey')}, not retrying`);
- break;
- }
- if (isDeadline) {
- err.type = 'upstream_deadline_exceeded';
- err.code = 'windsurf_provider_deadline';
- break;
- }
- // Retry only if nothing has been streamed yet AND it's a retryable error
- if (!hadSuccess && (err.isModelError || isRateLimit)) {
- if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) {
- const tag = isRateLimit ? 'rate_limit' : isTransient ? 'upstream_transient' : 'model_error';
- log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) failed (${tag}) on ${model}, stickyNoFallback enabled — not trying other accounts`);
+ // v2.0.61 (#113): policy refusal isn't account-bound, drop
+ // out of the retry loop immediately and let the SSE error
+ // path emit a 451-style chunk to the client.
+ if (isPolicyBlocked) {
+ log.warn(`Chat[${reqId}] stream: policy_blocked on ${safeKeyRef(currentApiKey, 'apiKey')}, not retrying`);
break;
}
- const tag = isRateLimit ? 'rate_limit' : isTransient ? 'upstream_transient' : 'model_error';
- if (isTransient) {
- streamInternalCount++;
- const backoffMs = await internalErrorBackoff(streamInternalCount - 1);
- log.warn(`Chat[${reqId}] stream: ${safeAccountRef(acct)} upstream transient error (${isTransport ? 'cascade_transport' : 'internal_error'}), waited ${backoffMs}ms before next account`);
- } else {
- log.warn(`Account ${safeAccountRef(acct)} failed (${tag}) on ${model}, trying next`);
+ if (isDeadline) {
+ err.type = 'upstream_deadline_exceeded';
+ err.code = 'windsurf_provider_deadline';
+ break;
}
- continue;
+ // Retry only if nothing has been streamed yet AND it's a retryable error
+ if (!hadSuccess && (err.isModelError || isRateLimit)) {
+ if (acct?._sticky && isExperimentalEnabled('stickyNoFallback')) {
+ const tag = isRateLimit ? 'rate_limit' : isTransient ? 'upstream_transient' : 'model_error';
+ log.warn(`Account ${safeAccountRef(acct)} (sticky-bound) failed (${tag}) on ${model}, stickyNoFallback enabled — not trying other accounts`);
+ break;
+ }
+ const tag = isRateLimit ? 'rate_limit' : isTransient ? 'upstream_transient' : 'model_error';
+ if (isTransient) {
+ streamInternalCount++;
+ const backoffMs = await internalErrorBackoff(streamInternalCount - 1);
+ log.warn(`Chat[${reqId}] stream: ${safeAccountRef(acct)} upstream transient error (${isTransport ? 'cascade_transport' : 'internal_error'}), waited ${backoffMs}ms before next account`);
+ } else {
+ log.warn(`Account ${safeAccountRef(acct)} failed (${tag}) on ${model}, trying next`);
+ }
+ continue;
+ }
+ break;
}
- break;
- }
} finally {
// Pair every successful getApiKey/acquireAccountByKey with a
// release so the in-flight balancer in auth.js (issue #37)
@@ -3995,14 +4050,14 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad
const errMsg = allInternal
? upstreamTransientErrorMessage(model, tried.length, lastIsTransport ? 'cascade_transport' : 'internal_error')
: deadlineExceeded
- ? upstreamDeadlineExceededMessage(model)
- : poolExhausted
- ? sanitizeText(lastErr?.message || 'language server pool exhausted')
- : temporaryUnavailable.allUnavailable
- ? `${model} 所有账号暂时不可用,请 ${Math.ceil(temporaryUnavailable.retryAfterMs / 1000)} 秒后重试`
- : rl.allLimited
- ? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试`
- : sanitizeText(lastErr?.message || 'no accounts');
+ ? upstreamDeadlineExceededMessage(model)
+ : poolExhausted
+ ? sanitizeText(lastErr?.message || 'language server pool exhausted')
+ : temporaryUnavailable.allUnavailable
+ ? `${model} 所有账号暂时不可用,请 ${Math.ceil(temporaryUnavailable.retryAfterMs / 1000)} 秒后重试`
+ : rl.allLimited
+ ? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试`
+ : sanitizeText(lastErr?.message || 'no accounts');
if (allInternal) {
log.error(`Chat[${reqId}] stream: ${tried.length}/${tried.length} accounts hit upstream transient error — surfacing upstream_transient_error`);
}
@@ -4027,15 +4082,15 @@ function streamResponse(id, created, model, modelKey, provider, messages, cascad
? 'upstream_transient_error'
: deadlineExceeded
? 'upstream_deadline_exceeded'
- : poolExhausted
- ? 'ls_pool_exhausted'
- : (temporaryUnavailable.allUnavailable || lastErr?.type === 'rate_limit_exceeded')
- ? 'rate_limit_exceeded'
- : 'upstream_error';
+ : poolExhausted
+ ? 'ls_pool_exhausted'
+ : (temporaryUnavailable.allUnavailable || lastErr?.type === 'rate_limit_exceeded')
+ ? 'rate_limit_exceeded'
+ : 'upstream_error';
send(chatStreamError(errMsg, errType, deadlineExceeded ? 'windsurf_provider_deadline' : null));
}
if (!emittedClientPayload) res.write('data: [DONE]\n\n');
- } catch {}
+ } catch { }
if (!res.writableEnded) res.end();
} finally {
unregisterSse();