diff --git a/.changeset/fix-close-guard-race.md b/.changeset/fix-close-guard-race.md new file mode 100644 index 000000000000..1a68b8af0f73 --- /dev/null +++ b/.changeset/fix-close-guard-race.md @@ -0,0 +1,5 @@ +--- +'ai': patch +--- + +Add close guard to prevent race condition when multiple tools complete simultaneously during parallel execution diff --git a/packages/ai/src/generate-text/run-tools-transformation.test.ts b/packages/ai/src/generate-text/run-tools-transformation.test.ts index 6adade0f83d5..11dca10cf3a1 100644 --- a/packages/ai/src/generate-text/run-tools-transformation.test.ts +++ b/packages/ai/src/generate-text/run-tools-transformation.test.ts @@ -2,7 +2,7 @@ import { LanguageModelV3StreamPart, LanguageModelV3Usage, } from '@ai-sdk/provider'; -import { delay, tool } from '@ai-sdk/provider-utils'; +import { delay, DelayedPromise, tool } from '@ai-sdk/provider-utils'; import { convertArrayToReadableStream, convertReadableStreamToArray, @@ -1140,4 +1140,177 @@ describe('runToolsTransformation', () => { }); }); }); + + describe('stream close guard', () => { + it('should guard against enqueue after stream closure when generateId returns duplicates', async () => { + // This test verifies the close guard prevents errors when: + // 1. generateId returns the same ID for all tools (framework override scenario) + // 2. First tool completes → stream closes (Set becomes empty after one delete) + // 3. Other tools try to enqueue their results → guarded by `if (!closed)` + // + // Without the guard, this throws "Controller is already closed" + + const toolACompleted = new DelayedPromise(); + const toolBCompleted = new DelayedPromise(); + const toolCCompleted = new DelayedPromise(); + + // Track errors thrown during tool execution + const toolErrors: unknown[] = []; + + const inputStream: ReadableStream = + convertArrayToReadableStream([ + { + type: 'tool-call', + toolCallId: 'call-a', + toolName: 'testTool', + input: `{ "id": "a" }`, + }, + { + type: 'tool-call', + toolCallId: 'call-b', + toolName: 'testTool', + input: `{ "id": "b" }`, + }, + { + type: 'tool-call', + toolCallId: 'call-c', + toolName: 'testTool', + input: `{ "id": "c" }`, + }, + { + type: 'finish', + finishReason: { unified: 'tool-calls', raw: 'tool_calls' }, + usage: testUsage, + }, + ]); + + const transformedStream = runToolsTransformation({ + // Simulate framework behavior: generateId returns same ID for message grouping + // This causes outstandingToolResults Set to only track one entry + generateId: () => 'constant-id', + tools: { + testTool: { + title: 'Test Tool', + inputSchema: z.object({ id: z.string() }), + execute: async ({ id }) => { + try { + if (id === 'a') return await toolACompleted.promise; + if (id === 'b') return await toolBCompleted.promise; + return await toolCCompleted.promise; + } catch (error) { + toolErrors.push(error); + throw error; + } + }, + }, + }, + generatorStream: inputStream, + tracer: new MockTracer(), + telemetry: undefined, + messages: [], + system: undefined, + abortSignal: undefined, + repairToolCall: undefined, + experimental_context: undefined, + }); + + // Start consuming the stream + const resultPromise = convertReadableStreamToArray(transformedStream); + + // Tool A completes first - this will close the stream because: + // - All 3 tools added same ID to Set (only 1 entry) + // - Tool A's finally() deletes that ID → Set is empty + // - attemptClose() sees empty Set → closes stream + toolACompleted.resolve('result-a'); + + // Allow microtasks to process + await Promise.resolve(); + await Promise.resolve(); + + // Tools B and C complete after stream is closed + // Without the `if (!closed)` guard, the enqueue in .then() throws + toolBCompleted.resolve('result-b'); + toolCCompleted.resolve('result-c'); + + // Wait for stream to complete and all promises to settle + const result = await resultPromise; + + // Stream should complete with finish chunk + expect(result.some(r => r.type === 'finish')).toBe(true); + + // The close guard silently drops results for tools B and C (stream already closed) + // Without the guard, trying to enqueue would throw "Controller is already closed" + // which would surface as error chunks or unhandled rejections + const errorChunks = result.filter(r => r.type === 'error'); + expect( + errorChunks, + 'With close guard, no error chunks should be emitted', + ).toHaveLength(0); + }); + + it('should handle multiple tools completing without delay operations', async () => { + // Simple test using DelayedPromise without any real-time delays + const toolA = new DelayedPromise(); + const toolB = new DelayedPromise(); + + const inputStream: ReadableStream = + convertArrayToReadableStream([ + { + type: 'tool-call', + toolCallId: 'call-a', + toolName: 'testTool', + input: `{ "id": "a" }`, + }, + { + type: 'tool-call', + toolCallId: 'call-b', + toolName: 'testTool', + input: `{ "id": "b" }`, + }, + { + type: 'finish', + finishReason: { unified: 'tool-calls', raw: 'tool_calls' }, + usage: testUsage, + }, + ]); + + const transformedStream = runToolsTransformation({ + generateId: mockId({ prefix: 'id' }), + tools: { + testTool: { + title: 'Test Tool', + inputSchema: z.object({ id: z.string() }), + execute: async ({ id }) => { + if (id === 'a') return toolA.promise; + return toolB.promise; + }, + }, + }, + generatorStream: inputStream, + tracer: new MockTracer(), + telemetry: undefined, + messages: [], + system: undefined, + abortSignal: undefined, + repairToolCall: undefined, + experimental_context: undefined, + }); + + // Start consuming - resolve tools in sequence + const resultPromise = convertReadableStreamToArray(transformedStream); + + // Resolve both tools + toolA.resolve('result-a'); + toolB.resolve('result-b'); + + const result = await resultPromise; + + // Verify all results captured + const toolResults = result.filter(r => r.type === 'tool-result'); + const finishChunks = result.filter(r => r.type === 'finish'); + + expect(toolResults).toHaveLength(2); + expect(finishChunks).toHaveLength(1); + }); + }); }); diff --git a/packages/ai/src/generate-text/run-tools-transformation.ts b/packages/ai/src/generate-text/run-tools-transformation.ts index 29865865aad7..635849812519 100644 --- a/packages/ai/src/generate-text/run-tools-transformation.ts +++ b/packages/ai/src/generate-text/run-tools-transformation.ts @@ -150,13 +150,23 @@ export function runToolsTransformation({ const toolCallsByToolCallId = new Map>(); let canClose = false; + let closed = false; // Prevent race condition when multiple tools complete simultaneously let finishChunk: | (SingleRequestTextStreamPart & { type: 'finish' }) | undefined = undefined; function attemptClose() { + // Prevent re-entry: if already closed, nothing to do + if (closed) { + return; + } + // close the tool results controller if no more outstanding tool calls if (canClose && outstandingToolResults.size === 0) { + // Mark as closed BEFORE doing any work to prevent race conditions + // where multiple finally() blocks call attemptClose() simultaneously + closed = true; + // we delay sending the finish chunk until all tool results (incl. delayed ones) // are received to ensure that the frontend receives tool results before a message // finish event arrives. @@ -324,17 +334,26 @@ export function runToolsTransformation({ abortSignal, experimental_context, onPreliminaryToolResult: result => { - toolResultsStreamController!.enqueue(result); + // Guard against enqueueing after stream is closed + if (!closed) { + toolResultsStreamController!.enqueue(result); + } }, }) .then(result => { - toolResultsStreamController!.enqueue(result); + // Guard against enqueueing after stream is closed + if (!closed) { + toolResultsStreamController!.enqueue(result); + } }) .catch(error => { - toolResultsStreamController!.enqueue({ - type: 'error', - error, - }); + // Guard against enqueueing after stream is closed + if (!closed) { + toolResultsStreamController!.enqueue({ + type: 'error', + error, + }); + } }) .finally(() => { outstandingToolResults.delete(toolExecutionId);