Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions src/core/assistant-message/NativeToolCallParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ export class NativeToolCallParser {
private static rawChunkTracker = new Map<
number,
{
id: string
id?: string
name: string
// Tracks whether a name field has been observed at all, distinct from a
// truthy name. This is a defensive guard: should a provider ever send an
// empty name, the start-gate must not rely on name truthiness.
nameSeen: boolean
hasStarted: boolean
deltaBuffer: string[]
}
Expand Down Expand Up @@ -105,51 +109,55 @@ export class NativeToolCallParser {
const events: ToolCallStreamEvent[] = []
const { index, id, name, arguments: args } = chunk

// Create the tracker on first sight of this index, independent of whether
// an id has arrived yet. Keying the lifecycle by index (not id) ensures any
// `arguments` that stream before the id is known are buffered rather than dropped.
let tracked = this.rawChunkTracker.get(index)

// Initialize new tool call tracking when we receive an id
if (id && !tracked) {
if (!tracked) {
tracked = {
id,
name: name || "",
nameSeen: name !== undefined,
hasStarted: false,
deltaBuffer: [],
}
this.rawChunkTracker.set(index, tracked)
}

if (!tracked) {
return events
// Record id and name as they arrive (they may come in separate chunks).
if (id) {
tracked.id = id
}

// Update name if present in chunk and not yet set
if (name) {
if (name !== undefined) {
tracked.name = name
Comment thread
awschmeder marked this conversation as resolved.
tracked.nameSeen = true
}

// Emit start event when we have the name
if (!tracked.hasStarted && tracked.name) {
// Emit start event only once both id and name are known. Using a local
// non-null id keeps emitted events typed as id: string.
if (!tracked.hasStarted && tracked.id && tracked.nameSeen) {
const startedId = tracked.id
events.push({
type: "tool_call_start",
id: tracked.id,
id: startedId,
name: tracked.name,
})
tracked.hasStarted = true

// Flush buffered deltas
// Flush buffered deltas accumulated during the pre-start window.
for (const bufferedDelta of tracked.deltaBuffer) {
events.push({
type: "tool_call_delta",
id: tracked.id,
id: startedId,
delta: bufferedDelta,
})
}
tracked.deltaBuffer = []
}

// Emit delta event for argument chunks
// Emit delta event for argument chunks, buffering until start is emitted.
if (args) {
if (tracked.hasStarted) {
if (tracked.hasStarted && tracked.id) {
events.push({
type: "tool_call_delta",
id: tracked.id,
Expand All @@ -172,11 +180,19 @@ export class NativeToolCallParser {

if (finishReason === "tool_calls" && this.rawChunkTracker.size > 0) {
for (const [, tracked] of this.rawChunkTracker.entries()) {
events.push({
type: "tool_call_end",
id: tracked.id,
})
// Only emit an end for trackers that actually started. A tracker that
// never received an id/name (malformed stream) must not emit a phantom
// end; since start requires an id, hasStarted implies tracked.id is set.
if (tracked.hasStarted && tracked.id) {
Comment thread
awschmeder marked this conversation as resolved.
events.push({
type: "tool_call_end",
id: tracked.id,
})
}
}
// Clear so a subsequent finalizeRawChunks() is a safe no-op and cannot
// double-fire end events for the same trackers.
this.rawChunkTracker.clear()
}

return events
Expand All @@ -191,7 +207,7 @@ export class NativeToolCallParser {

if (this.rawChunkTracker.size > 0) {
for (const [, tracked] of this.rawChunkTracker.entries()) {
if (tracked.hasStarted) {
if (tracked.hasStarted && tracked.id) {
events.push({
type: "tool_call_end",
id: tracked.id,
Expand Down
254 changes: 253 additions & 1 deletion src/core/assistant-message/__tests__/NativeToolCallParser.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { NativeToolCallParser } from "../NativeToolCallParser"
import { NativeToolCallParser, type ToolCallStreamEvent } from "../NativeToolCallParser"

describe("NativeToolCallParser", () => {
beforeEach(() => {
Expand Down Expand Up @@ -343,4 +343,256 @@ describe("NativeToolCallParser", () => {
})
})
})

describe("processRawChunk streaming reassembly", () => {
// Mirror the sequencing Task.ts performs: feed each raw chunk through
// processRawChunk, drive startStreamingToolCall on tool_call_start, feed
// tool_call_delta into processStreamingChunk, and emit ends at stream close
// via finalizeRawChunks() (the same call Task.ts makes after the stream ends).
// Returns the ordered event types/ids plus the finalized tool uses by id.
const drive = (rawChunks: Array<{ index: number; id?: string; name?: string; arguments?: string }>) => {
const events: ToolCallStreamEvent[] = []

const handleEvent = (event: ToolCallStreamEvent) => {
events.push(event)
if (event.type === "tool_call_start") {
NativeToolCallParser.startStreamingToolCall(event.id, event.name)
} else if (event.type === "tool_call_delta") {
NativeToolCallParser.processStreamingChunk(event.id, event.delta)
}
}

for (const chunk of rawChunks) {
for (const event of NativeToolCallParser.processRawChunk(chunk)) {
handleEvent(event)
}
}

// Task.ts finalizes any tool calls still open at stream end via
// finalizeRawChunks(), which emits the tool_call_end events.
for (const event of NativeToolCallParser.finalizeRawChunks()) {
handleEvent(event)
}

const finalized = new Map<string, ReturnType<typeof NativeToolCallParser.finalizeStreamingToolCall>>()
const startIds = events.filter((e) => e.type === "tool_call_start").map((e) => e.id)
for (const id of startIds) {
finalized.set(id, NativeToolCallParser.finalizeStreamingToolCall(id))
}

return { events, finalized }
}

it("preserves leading argument bytes that arrive before the id", () => {
// First chunk carries arguments but NO id; id+name arrive later, then more args.
const fullArgs = JSON.stringify({ path: "src/leading.ts", mode: "slice" })
const firstHalf = fullArgs.slice(0, 10)
const secondHalf = fullArgs.slice(10)

const { events, finalized } = drive([
{ index: 0, arguments: firstHalf },
{ index: 0, id: "call_late_id", name: "read_file" },
{ index: 0, arguments: secondHalf },
])

// Exactly one start, in the right order, with the late id.
const starts = events.filter((e) => e.type === "tool_call_start")
expect(starts).toHaveLength(1)
expect(starts[0].id).toBe("call_late_id")

// The finalized arguments must contain the complete, uncorrupted payload.
const result = finalized.get("call_late_id")
expect(result).not.toBeNull()
expect(result?.type).toBe("tool_use")
if (result?.type === "tool_use") {
const nativeArgs = result.nativeArgs as { path: string; mode?: string }
expect(nativeArgs.path).toBe("src/leading.ts")
expect(nativeArgs.mode).toBe("slice")
}
})

it("handles id and name arriving in separate chunks (issue #218)", () => {
Comment thread
awschmeder marked this conversation as resolved.
const fullArgs = JSON.stringify({ path: "src/split.ts" })

const { events, finalized } = drive([
{ index: 0, id: "call_split" },
{ index: 0, name: "read_file" },
{ index: 0, arguments: fullArgs },
])

const starts = events.filter((e) => e.type === "tool_call_start")
expect(starts).toHaveLength(1)
expect(starts[0].id).toBe("call_split")

const result = finalized.get("call_split")
expect(result?.type).toBe("tool_use")
if (result?.type === "tool_use") {
const nativeArgs = result.nativeArgs as { path: string }
expect(nativeArgs.path).toBe("src/split.ts")
}
})

it("handles name arriving before id with buffered args in between (reverse ordering)", () => {
const fullArgs = JSON.stringify({ path: "src/reverse.ts" })
const firstHalf = fullArgs.slice(0, 9)
const secondHalf = fullArgs.slice(9)

const { events, finalized } = drive([
{ index: 0, name: "read_file" },
{ index: 0, arguments: firstHalf },
{ index: 0, id: "call_reverse" },
{ index: 0, arguments: secondHalf },
])

// Start must not fire until the id arrives, so exactly one start with the late id.
const starts = events.filter((e) => e.type === "tool_call_start")
expect(starts).toHaveLength(1)
expect(starts[0].id).toBe("call_reverse")

// The buffered delta must be flushed only after the start event.
const startIndex = events.findIndex((e) => e.type === "tool_call_start")
const firstDeltaIndex = events.findIndex((e) => e.type === "tool_call_delta")
expect(startIndex).toBeLessThan(firstDeltaIndex)

const result = finalized.get("call_reverse")
expect(result).not.toBeNull()
expect(result?.type).toBe("tool_use")
if (result?.type === "tool_use") {
expect((result.nativeArgs as { path: string }).path).toBe("src/reverse.ts")
}
})

it("keeps two parallel tool calls on distinct indices isolated", () => {
const argsA = JSON.stringify({ path: "src/a.ts" })
const argsB = JSON.stringify({ path: "src/b.ts" })

const { events, finalized } = drive([
{ index: 0, arguments: argsA.slice(0, 8) },
{ index: 1, arguments: argsB.slice(0, 8) },
{ index: 0, id: "call_a", name: "read_file" },
{ index: 1, id: "call_b", name: "read_file" },
{ index: 0, arguments: argsA.slice(8) },
{ index: 1, arguments: argsB.slice(8) },
])

const starts = events.filter((e) => e.type === "tool_call_start")
expect(starts).toHaveLength(2)

const resultA = finalized.get("call_a")
const resultB = finalized.get("call_b")
expect(resultA).not.toBeNull()
expect(resultB).not.toBeNull()
if (resultA?.type === "tool_use") {
Comment thread
awschmeder marked this conversation as resolved.
expect((resultA.nativeArgs as { path: string }).path).toBe("src/a.ts")
}
if (resultB?.type === "tool_use") {
expect((resultB.nativeArgs as { path: string }).path).toBe("src/b.ts")
}
})

it("emits the same event sequence for the single-chunk-with-id flow (regression guard)", () => {
const fullArgs = JSON.stringify({ path: "src/single.ts" })

const { events, finalized } = drive([
{ index: 0, id: "call_single", name: "read_file", arguments: fullArgs },
])

expect(events.map((e) => e.type)).toEqual(["tool_call_start", "tool_call_delta", "tool_call_end"])
expect(events.every((e) => e.id === "call_single")).toBe(true)

const result = finalized.get("call_single")
expect(result).not.toBeNull()
expect(result?.type).toBe("tool_use")
if (result?.type === "tool_use") {
Comment thread
awschmeder marked this conversation as resolved.
expect((result.nativeArgs as { path: string }).path).toBe("src/single.ts")
}
})

it("does not emit a phantom tool_call_end for a tracker that never received an id", () => {
const { events } = drive([{ index: 0, arguments: '{"path":"orphan.ts"}' }])

expect(events.filter((e) => e.type === "tool_call_start")).toHaveLength(0)
expect(events.filter((e) => e.type === "tool_call_end")).toHaveLength(0)
})

it("finalizeRawChunks() emits end events and guards against missing id", () => {
// Simulate a started tool call: process chunks to populate state
const chunks = [
{ index: 0, id: "call_finalize", name: "read_file" },
{ index: 0, arguments: '{"path":"file.ts"' },
{ index: 0, arguments: ',"mode":"slice"}' },
]

const events: Array<{ type: string; id?: string }> = []
for (const chunk of chunks) {
for (const event of NativeToolCallParser.processRawChunk(chunk)) {
events.push(event)
if (event.type === "tool_call_start") {
NativeToolCallParser.startStreamingToolCall(event.id, event.name)
} else if (event.type === "tool_call_delta") {
NativeToolCallParser.processStreamingChunk(event.id, event.delta)
}
}
}

// Now finalize the raw chunks to emit the end event
const finalizeEvents = NativeToolCallParser.finalizeRawChunks()
for (const event of finalizeEvents) {
events.push(event)
}

// Verify the end event was produced by finalizeRawChunks
const ends = events.filter((e) => e.type === "tool_call_end")
expect(ends).toHaveLength(1)
expect(ends[0].id).toBe("call_finalize")

// Finalize the tool call to ensure it contains the complete arguments
const result = NativeToolCallParser.finalizeStreamingToolCall("call_finalize")
expect(result?.type).toBe("tool_use")
if (result?.type === "tool_use") {
expect((result.nativeArgs as { path: string }).path).toBe("file.ts")
}
})

it("finalizeRawChunks() does not emit end for tracker without id", () => {
// Start a tracker with arguments but no id, then finalize
const chunks = [{ index: 0, arguments: '{"incomplete":true}' }]

for (const chunk of chunks) {
NativeToolCallParser.processRawChunk(chunk)
}

// Finalize should not emit an end event if id was never set
const finalizeEvents = NativeToolCallParser.finalizeRawChunks()
const ends = finalizeEvents.filter((e) => e.type === "tool_call_end")
expect(ends).toHaveLength(0)

NativeToolCallParser.clearRawChunkState()
})

it("does not double-fire end events across processFinishReason and finalizeRawChunks", () => {
// Drive a started tool call through the raw chunk path.
const chunks = [
{ index: 0, id: "call_dup", name: "read_file" },
{ index: 0, arguments: '{"path":"file.ts"}' },
]
for (const chunk of chunks) {
NativeToolCallParser.processRawChunk(chunk)
}

// Task.ts emits ends via processFinishReason, then calls finalizeRawChunks
// unconditionally. Both must not emit an end for the same tracker.
const finishEvents = NativeToolCallParser.processFinishReason("tool_calls")
const finalizeEvents = NativeToolCallParser.finalizeRawChunks()

const allEnds = [...finishEvents, ...finalizeEvents].filter((e) => e.type === "tool_call_end")
expect(allEnds).toHaveLength(1)
expect(allEnds[0].id).toBe("call_dup")
// finishReason emits the single end; finalize must be a no-op for the same tracker.
expect(finishEvents.filter((e) => e.type === "tool_call_end")).toHaveLength(1)
expect(finalizeEvents.filter((e) => e.type === "tool_call_end")).toHaveLength(0)

NativeToolCallParser.clearRawChunkState()
})
})
})
Loading
Loading