Skip to content

Commit ca20575

Browse files
fix: detect silent SSE connection drops during long tool executions
When Claude runs a long script, the SSE connection can be silently dropped by transport layers (OS TCP stack, Electron internals, HTTP idle timeouts). Previously this caused the stream to appear "completed" with partial content and no error message. Two-part fix: Server-side heartbeat (claude-client.ts): - Send keep_alive SSE events every 30s independently of SDK activity - Prevents intermediate layers from considering the connection idle - Timer properly cleaned up in all exit paths including cancel() Client-side detection (useSSEStream.ts + stream-session-manager.ts): - Track whether the server's 'done' SSE event was received - If reader finishes without 'done', treat as connection drop - Show "Connection lost" error and clear stale SDK session - Flush residual SSE buffer after reader signals done i18n (en.ts + zh.ts): - Add streaming.connectionDrop translation key Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c8a1581 commit ca20575

5 files changed

Lines changed: 78 additions & 2 deletions

File tree

src/hooks/useSSEStream.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,12 @@ function handleSSEEvent(
237237
export async function consumeSSEStream(
238238
reader: ReadableStreamDefaultReader<Uint8Array>,
239239
callbacks: SSECallbacks,
240-
): Promise<{ accumulated: string; tokenUsage: TokenUsage | null }> {
240+
): Promise<{ accumulated: string; tokenUsage: TokenUsage | null; receivedDone: boolean }> {
241241
const decoder = new TextDecoder();
242242
let buffer = '';
243243
let accumulated = '';
244244
let tokenUsage: TokenUsage | null = null;
245+
let receivedDone = false;
245246

246247
const wrappedCallbacks: SSECallbacks = {
247248
...callbacks,
@@ -264,14 +265,30 @@ export async function consumeSSEStream(
264265

265266
try {
266267
const event: SSEEvent = JSON.parse(line.slice(6));
268+
if (event.type === 'done') {
269+
receivedDone = true;
270+
}
267271
accumulated = handleSSEEvent(event, accumulated, wrappedCallbacks);
268272
} catch {
269273
// skip malformed SSE lines
270274
}
271275
}
272276
}
273277

274-
return { accumulated, tokenUsage };
278+
// Flush any residual buffer in case the final chunk didn't end with \n
279+
if (buffer.trim().startsWith('data: ')) {
280+
try {
281+
const event: SSEEvent = JSON.parse(buffer.trim().slice(6));
282+
if (event.type === 'done') {
283+
receivedDone = true;
284+
}
285+
accumulated = handleSSEEvent(event, accumulated, wrappedCallbacks);
286+
} catch {
287+
// skip malformed residual data
288+
}
289+
}
290+
291+
return { accumulated, tokenUsage, receivedDone };
275292
}
276293

277294
/**

src/i18n/en.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ const en = {
5757
'streaming.allowForSession': 'Allow for Session',
5858
'streaming.allowed': 'Allowed',
5959
'streaming.denied': 'Denied',
60+
'streaming.connectionDrop': 'Connection lost — the server stream ended unexpectedly. Claude may still be running in the background. Please try sending your message again.',
6061

6162
// ── Chat view / session page ────────────────────────────────
6263
'chat.newConversation': 'New Conversation',

src/i18n/zh.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ const zh: Record<TranslationKey, string> = {
5454
'streaming.allowForSession': '本次会话允许',
5555
'streaming.allowed': '已允许',
5656
'streaming.denied': '已拒绝',
57+
'streaming.connectionDrop': '连接中断 — 服务器流意外结束。Claude 可能仍在后台运行,请尝试重新发送消息。',
5758

5859
// ── Chat view / session page ────────────────────────────────
5960
'chat.newConversation': '新对话',

src/lib/claude-client.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,8 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
397397
generativeUI,
398398
} = options;
399399

400+
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
401+
400402
return new ReadableStream<string>({
401403
async start(controller) {
402404
// Resolve provider via the unified resolver. The caller may pass an explicit
@@ -824,6 +826,22 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
824826
let tokenUsage: TokenUsage | null = null;
825827
// Track pending TodoWrite tool_use_ids so we can sync after successful execution
826828
const pendingTodoWrites = new Map<string, Array<{ content: string; status: string; activeForm?: string }>>();
829+
830+
// Server-side heartbeat: send keep_alive every 30s to prevent
831+
// transport-level idle connection drops (Electron, OS TCP, proxies).
832+
// This is independent of the SDK's own keep_alive messages.
833+
heartbeatTimer = setInterval(() => {
834+
try {
835+
controller.enqueue(formatSSE({ type: 'keep_alive', data: '' }));
836+
} catch {
837+
// Controller may be closed — stop heartbeat
838+
if (heartbeatTimer) {
839+
clearInterval(heartbeatTimer);
840+
heartbeatTimer = null;
841+
}
842+
}
843+
}, 30_000);
844+
827845
for await (const message of conversation) {
828846
if (abortController?.signal.aborted) {
829847
break;
@@ -1059,9 +1077,11 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
10591077
}
10601078
}
10611079

1080+
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
10621081
controller.enqueue(formatSSE({ type: 'done', data: '' }));
10631082
controller.close();
10641083
} catch (error) {
1084+
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
10651085
const rawMessage = error instanceof Error ? error.message : 'Unknown error';
10661086
// Log full error details for debugging (visible in terminal / dev tools)
10671087
const stderrContent = error instanceof Error ? (error as { stderr?: string }).stderr : undefined;
@@ -1124,6 +1144,7 @@ export function streamClaude(options: ClaudeStreamOptions): ReadableStream<strin
11241144
},
11251145

11261146
cancel() {
1147+
if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; }
11271148
abortController?.abort();
11281149
},
11291150
});

src/lib/stream-session-manager.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,42 @@ async function runStream(stream: ActiveStream, params: StartStreamParams): Promi
358358
},
359359
});
360360

361+
// Detect premature stream end (connection drop without server 'done' event)
362+
if (!result.receivedDone) {
363+
cleanupTimers(stream);
364+
365+
const dropMsg = 'Connection lost — the server stream ended unexpectedly. Claude may still be running in the background. Please try sending your message again.';
366+
const errContent = stream.accumulatedText.trim()
367+
? stream.accumulatedText.trim() + `\n\n**Error:** ${dropMsg}`
368+
: `**Error:** ${dropMsg}`;
369+
370+
stream.snapshot = {
371+
...buildSnapshot(stream),
372+
phase: 'error',
373+
completedAt: Date.now(),
374+
error: dropMsg,
375+
finalMessageContent: errContent,
376+
statusText: undefined,
377+
pendingPermission: null,
378+
permissionResolved: null,
379+
};
380+
stream.accumulatedText = '';
381+
stream.toolUsesArray = [];
382+
stream.toolResultsArray = [];
383+
stream.toolOutputAccumulated = '';
384+
emit(stream, 'completed');
385+
386+
// Clear stale SDK session so next message starts fresh
387+
fetch(`/api/chat/sessions/${encodeURIComponent(stream.sessionId)}`, {
388+
method: 'PATCH',
389+
headers: { 'Content-Type': 'application/json' },
390+
body: JSON.stringify({ sdk_session_id: '' }),
391+
}).catch(() => {});
392+
393+
scheduleGC(stream);
394+
return;
395+
}
396+
361397
// Stream completed successfully — build final message content
362398
const accumulated = result.accumulated;
363399
const finalToolUses = stream.toolUsesArray;

0 commit comments

Comments
 (0)