Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/lib/__tests__/add-edge-modal.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ vi.mock("@/lib/mock-data", () => ({
// ---------------------------------------------------------------------------
let mockActiveModal: string | null = null
let mockSourceRefId: string | null = null
const mockClose = vi.fn()
let mockClose = vi.fn()

vi.mock("@/stores/modal-store", () => ({
useModalStore: (sel: (s: Record<string, unknown>) => unknown) =>
Expand Down Expand Up @@ -74,6 +74,7 @@ function closeModal() {
// ---------------------------------------------------------------------------
describe("AddEdgeModal", () => {
beforeEach(() => {
mockClose = vi.fn()
vi.clearAllMocks()
mockCreateEdge.mockResolvedValue({})
closeModal()
Expand Down
118 changes: 10 additions & 108 deletions src/lib/agent-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,108 +41,6 @@ async function buildSignedUrl(path: string): Promise<string> {
return url.toString()
}

// Parse a single SSE line and return { event, data } or null
function parseSseLine(line: string): { event: string; data: string } | null {
if (!line || line.startsWith(":")) return null
if (line.startsWith("data: ")) {
return { event: "message", data: line.slice(6) }
}
return null
}

// Process a stream of SSE data using ReadableStream
async function processSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
opts: StreamAgentOpts,
retryFn: () => Promise<void>
): Promise<void> {
const decoder = new TextDecoder()
let buffer = ""
let finalAnswer = ""
let citedRefIds: string[] = []
const activeToolCalls = new Map<string, ToolCallEvent>()

while (true) {
const { done, value } = await reader.read()
if (done) break

buffer += decoder.decode(value, { stream: true })
const lines = buffer.split("\n")
buffer = lines.pop() ?? ""

for (const line of lines) {
const parsed = parseSseLine(line.trim())
if (!parsed) continue

const raw = parsed.data
if (raw === "[DONE]") continue

try {
const chunk = JSON.parse(raw)

// AI SDK UI stream format
if (typeof chunk === "object" && chunk !== null) {
// Text delta
if (chunk.type === "text-delta" || chunk.type === "0") {
const delta = chunk.textDelta ?? chunk.value ?? ""
if (delta) {
finalAnswer += delta
opts.onChunk(delta)
}
}
// Tool call start
else if (chunk.type === "tool-call" || chunk.type === "9") {
const toolName = chunk.toolName ?? chunk.tool ?? ""
const toolCallId = chunk.toolCallId ?? chunk.id ?? String(Date.now())
const toolCall: ToolCallEvent = {
id: toolCallId,
tool: toolName,
params: chunk.args ?? chunk.params ?? {},
status: "in-flight",
}
activeToolCalls.set(toolCallId, toolCall)
opts.onToolCall({ ...toolCall })
}
// Tool result
else if (chunk.type === "tool-result" || chunk.type === "a") {
const toolCallId = chunk.toolCallId ?? chunk.id ?? ""
const existing = activeToolCalls.get(toolCallId)
if (existing) {
const resultCount =
chunk.result?.nodes?.length ?? chunk.result?.count ?? undefined
const updated: ToolCallEvent = {
...existing,
status: "done",
resultCount,
}
activeToolCalls.set(toolCallId, updated)
opts.onToolCall({ ...updated })
}
}
// Done / finish
else if (chunk.type === "done" || chunk.type === "finish-message") {
if (chunk.answer) finalAnswer = chunk.answer
if (Array.isArray(chunk.cited_ref_ids)) citedRefIds = chunk.cited_ref_ids
}
// Error
else if (chunk.type === "error") {
opts.onError(new Error(chunk.error ?? "Agent error"))
return
}
}
} catch {
// Non-JSON lines are plain text deltas (some SSE formats)
if (raw && raw !== "[DONE]") {
finalAnswer += raw
opts.onChunk(raw)
}
}
}
}

opts.onDone({ answer: finalAnswer, cited_ref_ids: citedRefIds })
}

// Mock SSE stream for development
async function mockStreamAgent(
prompt: string,
Expand Down Expand Up @@ -213,7 +111,7 @@ export async function streamAgent(

const headers: Record<string, string> = {
"Content-Type": "application/json",
Accept: "text/event-stream",
Accept: "application/json",
}
if (l402) headers["Authorization"] = l402

Expand All @@ -224,7 +122,6 @@ export async function streamAgent(
headers,
body: JSON.stringify({
prompt,
stream: true,
sessionId: opts.sessionId,
}),
signal: opts.signal,
Expand Down Expand Up @@ -252,13 +149,18 @@ export async function streamAgent(
return
}

const reader = response.body?.getReader()
if (!reader) {
opts.onError(new Error("No response body"))
let data: { answer?: string; cited_ref_ids?: string[] }
try {
data = await response.json()
} catch {
opts.onError(new Error("Invalid JSON from agent"))
return
}

await processSSEStream(reader, opts, () => doRequest(true))
opts.onDone({
answer: data.answer ?? "",
cited_ref_ids: data.cited_ref_ids ?? [],
})
}

return doRequest()
Expand Down
Loading