Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-close-guard-race.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'ai': patch
---

Add close guard to prevent race condition when multiple tools complete simultaneously during parallel execution
175 changes: 174 additions & 1 deletion packages/ai/src/generate-text/run-tools-transformation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {
LanguageModelV3StreamPart,
LanguageModelV3Usage,
} from '@ai-sdk/provider';
import { delay, tool } from '@ai-sdk/provider-utils';
import { delay, DelayedPromise, tool } from '@ai-sdk/provider-utils';
import {
convertArrayToReadableStream,
convertReadableStreamToArray,
Expand Down Expand Up @@ -1140,4 +1140,177 @@ describe('runToolsTransformation', () => {
});
});
});

describe('stream close guard', () => {
it('should guard against enqueue after stream closure when generateId returns duplicates', async () => {
// This test verifies the close guard prevents errors when:
// 1. generateId returns the same ID for all tools (framework override scenario)
// 2. First tool completes → stream closes (Set becomes empty after one delete)
// 3. Other tools try to enqueue their results → guarded by `if (!closed)`
//
// Without the guard, this throws "Controller is already closed"

const toolACompleted = new DelayedPromise<string>();
const toolBCompleted = new DelayedPromise<string>();
const toolCCompleted = new DelayedPromise<string>();

// Track errors thrown during tool execution
const toolErrors: unknown[] = [];

const inputStream: ReadableStream<LanguageModelV3StreamPart> =
convertArrayToReadableStream([
{
type: 'tool-call',
toolCallId: 'call-a',
toolName: 'testTool',
input: `{ "id": "a" }`,
},
{
type: 'tool-call',
toolCallId: 'call-b',
toolName: 'testTool',
input: `{ "id": "b" }`,
},
{
type: 'tool-call',
toolCallId: 'call-c',
toolName: 'testTool',
input: `{ "id": "c" }`,
},
{
type: 'finish',
finishReason: { unified: 'tool-calls', raw: 'tool_calls' },
usage: testUsage,
},
]);

const transformedStream = runToolsTransformation({
// Simulate framework behavior: generateId returns same ID for message grouping
// This causes outstandingToolResults Set to only track one entry
generateId: () => 'constant-id',
Comment on lines +1188 to +1190
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not the framework behavior. by default different ids are generated. if generateId returns the same id several times this is a misconfiguration.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once this is changed to generateId: mockId() the test passes without the fix.

is this a misconfiguration and not an ai sdk bug?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the expectation is that generateId() generates unique ids. this might need to be clarified in the documentation / jsdoc

tools: {
testTool: {
title: 'Test Tool',
inputSchema: z.object({ id: z.string() }),
execute: async ({ id }) => {
try {
if (id === 'a') return await toolACompleted.promise;
if (id === 'b') return await toolBCompleted.promise;
return await toolCCompleted.promise;
} catch (error) {
toolErrors.push(error);
throw error;
}
},
},
},
generatorStream: inputStream,
tracer: new MockTracer(),
telemetry: undefined,
messages: [],
system: undefined,
abortSignal: undefined,
repairToolCall: undefined,
experimental_context: undefined,
});

// Start consuming the stream
const resultPromise = convertReadableStreamToArray(transformedStream);

// Tool A completes first - this will close the stream because:
// - All 3 tools added same ID to Set (only 1 entry)
// - Tool A's finally() deletes that ID → Set is empty
// - attemptClose() sees empty Set → closes stream
toolACompleted.resolve('result-a');

// Allow microtasks to process
await Promise.resolve();
await Promise.resolve();

// Tools B and C complete after stream is closed
// Without the `if (!closed)` guard, the enqueue in .then() throws
toolBCompleted.resolve('result-b');
toolCCompleted.resolve('result-c');

// Wait for stream to complete and all promises to settle
const result = await resultPromise;

// Stream should complete with finish chunk
expect(result.some(r => r.type === 'finish')).toBe(true);

// The close guard silently drops results for tools B and C (stream already closed)
// Without the guard, trying to enqueue would throw "Controller is already closed"
// which would surface as error chunks or unhandled rejections
const errorChunks = result.filter(r => r.type === 'error');
expect(
errorChunks,
'With close guard, no error chunks should be emitted',
).toHaveLength(0);
});

it('should handle multiple tools completing without delay operations', async () => {
// Simple test using DelayedPromise without any real-time delays
const toolA = new DelayedPromise<string>();
const toolB = new DelayedPromise<string>();

const inputStream: ReadableStream<LanguageModelV3StreamPart> =
convertArrayToReadableStream([
{
type: 'tool-call',
toolCallId: 'call-a',
toolName: 'testTool',
input: `{ "id": "a" }`,
},
{
type: 'tool-call',
toolCallId: 'call-b',
toolName: 'testTool',
input: `{ "id": "b" }`,
},
{
type: 'finish',
finishReason: { unified: 'tool-calls', raw: 'tool_calls' },
usage: testUsage,
},
]);

const transformedStream = runToolsTransformation({
generateId: mockId({ prefix: 'id' }),
tools: {
testTool: {
title: 'Test Tool',
inputSchema: z.object({ id: z.string() }),
execute: async ({ id }) => {
if (id === 'a') return toolA.promise;
return toolB.promise;
},
},
},
generatorStream: inputStream,
tracer: new MockTracer(),
telemetry: undefined,
messages: [],
system: undefined,
abortSignal: undefined,
repairToolCall: undefined,
experimental_context: undefined,
});

// Start consuming - resolve tools in sequence
const resultPromise = convertReadableStreamToArray(transformedStream);

// Resolve both tools
toolA.resolve('result-a');
toolB.resolve('result-b');

const result = await resultPromise;

// Verify all results captured
const toolResults = result.filter(r => r.type === 'tool-result');
const finishChunks = result.filter(r => r.type === 'finish');

expect(toolResults).toHaveLength(2);
expect(finishChunks).toHaveLength(1);
});
});
});
31 changes: 25 additions & 6 deletions packages/ai/src/generate-text/run-tools-transformation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,23 @@ export function runToolsTransformation<TOOLS extends ToolSet>({
const toolCallsByToolCallId = new Map<string, TypedToolCall<TOOLS>>();

let canClose = false;
let closed = false; // Prevent race condition when multiple tools complete simultaneously
let finishChunk:
| (SingleRequestTextStreamPart<TOOLS> & { type: 'finish' })
| undefined = undefined;

function attemptClose() {
// Prevent re-entry: if already closed, nothing to do
if (closed) {
return;
}

// close the tool results controller if no more outstanding tool calls
if (canClose && outstandingToolResults.size === 0) {
// Mark as closed BEFORE doing any work to prevent race conditions
// where multiple finally() blocks call attemptClose() simultaneously
closed = true;

// we delay sending the finish chunk until all tool results (incl. delayed ones)
// are received to ensure that the frontend receives tool results before a message
// finish event arrives.
Expand Down Expand Up @@ -324,17 +334,26 @@ export function runToolsTransformation<TOOLS extends ToolSet>({
abortSignal,
experimental_context,
onPreliminaryToolResult: result => {
toolResultsStreamController!.enqueue(result);
// Guard against enqueueing after stream is closed
if (!closed) {
toolResultsStreamController!.enqueue(result);
}
},
})
.then(result => {
toolResultsStreamController!.enqueue(result);
// Guard against enqueueing after stream is closed
if (!closed) {
toolResultsStreamController!.enqueue(result);
}
})
.catch(error => {
toolResultsStreamController!.enqueue({
type: 'error',
error,
});
// Guard against enqueueing after stream is closed
if (!closed) {
toolResultsStreamController!.enqueue({
type: 'error',
error,
});
}
})
.finally(() => {
outstandingToolResults.delete(toolExecutionId);
Expand Down
Loading