From 00353042707e06afea607fb559c49d2104199c6e Mon Sep 17 00:00:00 2001 From: tomsmith8 Date: Thu, 21 May 2026 22:59:00 +0000 Subject: [PATCH] Generated with Hive: Refactor streamAgent to async job with SSE events flow --- src/lib/__tests__/add-content-modal.test.tsx | 11 ++ src/lib/__tests__/budget-modal.test.tsx | 3 + src/lib/__tests__/main-area.test.tsx | 12 +- src/lib/__tests__/node-preview-panel.test.tsx | 15 ++- src/lib/__tests__/setup.ts | 6 + src/lib/agent-api.ts | 123 +++++++++--------- 6 files changed, 101 insertions(+), 69 deletions(-) diff --git a/src/lib/__tests__/add-content-modal.test.tsx b/src/lib/__tests__/add-content-modal.test.tsx index c6e8dda..7b2200f 100644 --- a/src/lib/__tests__/add-content-modal.test.tsx +++ b/src/lib/__tests__/add-content-modal.test.tsx @@ -150,6 +150,17 @@ describe("AddContentModal — preview probe", () => { mockIsSubscriptionSource.mockReturnValue(false) }) + afterEach(() => { + // userEvent.type fires handleDetect on every keystroke, each of which starts + // async chains (detectSourceType → checkNodeExists → api.get). After the test + // ends, still-pending microtasks can fire against the next test's mock setup. + // Resetting mocks here makes any stale calls resolve to harmless defaults so + // they don't interfere with subsequent tests. + mockDetectSourceType.mockResolvedValue(null) + mockCheckNodeExists.mockResolvedValue({ exists: false, ref_id: null, status: null }) + mockApiGet.mockResolvedValue({ nodes: [] }) + }) + it("owned (200): auto-routes to player and closes modal", async () => { mockDetectSourceType.mockResolvedValue("youtube_video") mockCheckNodeExists.mockResolvedValue({ diff --git a/src/lib/__tests__/budget-modal.test.tsx b/src/lib/__tests__/budget-modal.test.tsx index 434fefb..cf2b279 100644 --- a/src/lib/__tests__/budget-modal.test.tsx +++ b/src/lib/__tests__/budget-modal.test.tsx @@ -69,6 +69,9 @@ vi.mock("@/lib/sphinx", () => ({ pollPaymentStatus: (...args: unknown[]) => mockPollPaymentStatus(...args), fetchBuyLsatChallenge: (...args: unknown[]) => mockFetchBuyLsatChallenge(...args), fetchTransactionHistory: (...args: unknown[]) => mockFetchTransactionHistory(...args), + savePendingLsat: vi.fn((challenge: unknown, amount?: number) => ({ ...challenge as object, amount: amount ?? 0, createdAt: Date.now() })), + getPendingLsat: vi.fn(() => null), + clearPendingLsat: vi.fn(), })) // --- Mock data --- diff --git a/src/lib/__tests__/main-area.test.tsx b/src/lib/__tests__/main-area.test.tsx index be68b57..610c316 100644 --- a/src/lib/__tests__/main-area.test.tsx +++ b/src/lib/__tests__/main-area.test.tsx @@ -26,10 +26,14 @@ const appState = { myContentOpen: false, clipsOpen: false, followingOpen: false, + agentOpen: false, + workflowsOpen: false, setSourcesOpen: vi.fn(), setMyContentOpen: vi.fn(), setClipsOpen: vi.fn(), setFollowingOpen: vi.fn(), + setAgentOpen: vi.fn(), + setWorkflowsOpen: vi.fn(), } vi.mock("@/stores/app-store", () => ({ @@ -76,6 +80,8 @@ describe("LeftPane pickMode()", () => { appState.myContentOpen = false appState.clipsOpen = false appState.followingOpen = false + appState.agentOpen = false + appState.workflowsOpen = false }) it("shows feed when nothing is open", () => { @@ -105,6 +111,10 @@ describe("LeftPane pickMode()", () => { render() expect(screen.getByTestId("clips-panel")).toBeTruthy() expect(screen.queryByTestId("node-preview-panel")).toBeNull() - expect(screen.queryByTestId("feed-view")).toBeNull() + // FeedView is always in the DOM but wrapped in a Tailwind "hidden" div when + // not in feed mode. jsdom doesn't evaluate CSS classes, so we check the + // wrapper element carries the "hidden" class instead. + const feedEl = screen.queryByTestId("feed-view") + expect(feedEl?.parentElement?.classList.contains("hidden")).toBe(true) }) }) diff --git a/src/lib/__tests__/node-preview-panel.test.tsx b/src/lib/__tests__/node-preview-panel.test.tsx index 48b2023..95ba79a 100644 --- a/src/lib/__tests__/node-preview-panel.test.tsx +++ b/src/lib/__tests__/node-preview-panel.test.tsx @@ -194,11 +194,16 @@ describe("NodePreviewPanel – price display", () => { }) it("renders 'Unlock for 10 sats' when 402 body has price: 10", async () => { - mockApiGet.mockRejectedValue( - new Response(JSON.stringify({ price: 10 }), { - status: 402, - headers: { "Content-Type": "application/json" }, - }) + // Use mockImplementation (not mockRejectedValue) so each call gets a fresh + // Response instance — a Response body can only be consumed once, and stale + // async effects from prior tests can otherwise exhaust the shared instance. + mockApiGet.mockImplementation(() => + Promise.reject( + new Response(JSON.stringify({ price: 10 }), { + status: 402, + headers: { "Content-Type": "application/json" }, + }) + ) ) render() diff --git a/src/lib/__tests__/setup.ts b/src/lib/__tests__/setup.ts index df6631e..89cf7f5 100644 --- a/src/lib/__tests__/setup.ts +++ b/src/lib/__tests__/setup.ts @@ -1 +1,7 @@ import "@testing-library/jest-dom" + +// jsdom does not implement Element.prototype.scrollTo — stub it so components +// that call element.scrollTo({ top: 0 }) don't throw in tests. +if (typeof Element !== "undefined" && !Element.prototype.scrollTo) { + Element.prototype.scrollTo = () => undefined +} diff --git a/src/lib/agent-api.ts b/src/lib/agent-api.ts index df4e4be..1a5e890 100644 --- a/src/lib/agent-api.ts +++ b/src/lib/agent-api.ts @@ -50,17 +50,15 @@ function parseSseLine(line: string): { event: string; data: string } | null { return null } -// Process a stream of SSE data using ReadableStream -async function processSSEStream( +// Process the async event bus SSE stream +async function processEventStream( reader: ReadableStreamDefaultReader, - opts: StreamAgentOpts, - retryFn: () => Promise + opts: StreamAgentOpts ): Promise { const decoder = new TextDecoder() let buffer = "" let finalAnswer = "" let citedRefIds: string[] = [] - const activeToolCalls = new Map() while (true) { const { done, value } = await reader.read() @@ -73,69 +71,38 @@ async function processSSEStream( for (const line of lines) { const parsed = parseSseLine(line.trim()) if (!parsed) continue - const raw = parsed.data if (raw === "[DONE]") continue try { const chunk = JSON.parse(raw) + if (typeof chunk !== "object" || chunk === null) continue - // AI SDK UI stream format - if (typeof chunk === "object" && chunk !== null) { - // Text delta - if (chunk.type === "text-delta" || chunk.type === "0") { - const delta = chunk.textDelta ?? chunk.value ?? "" - if (delta) { - finalAnswer += delta - opts.onChunk(delta) - } - } - // Tool call start - else if (chunk.type === "tool-call" || chunk.type === "9") { - const toolName = chunk.toolName ?? chunk.tool ?? "" - const toolCallId = chunk.toolCallId ?? chunk.id ?? String(Date.now()) - const toolCall: ToolCallEvent = { - id: toolCallId, - tool: toolName, - params: chunk.args ?? chunk.params ?? {}, - status: "in-flight", - } - activeToolCalls.set(toolCallId, toolCall) - opts.onToolCall({ ...toolCall }) + if (chunk.type === "text") { + const text = chunk.text ?? "" + if (text) { + finalAnswer += text + opts.onChunk(text) } - // Tool result - else if (chunk.type === "tool-result" || chunk.type === "a") { - const toolCallId = chunk.toolCallId ?? chunk.id ?? "" - const existing = activeToolCalls.get(toolCallId) - if (existing) { - const resultCount = - chunk.result?.nodes?.length ?? chunk.result?.count ?? undefined - const updated: ToolCallEvent = { - ...existing, - status: "done", - resultCount, - } - activeToolCalls.set(toolCallId, updated) - opts.onToolCall({ ...updated }) - } - } - // Done / finish - else if (chunk.type === "done" || chunk.type === "finish-message") { - if (chunk.answer) finalAnswer = chunk.answer - if (Array.isArray(chunk.cited_ref_ids)) citedRefIds = chunk.cited_ref_ids - } - // Error - else if (chunk.type === "error") { - opts.onError(new Error(chunk.error ?? "Agent error")) - return + } else if (chunk.type === "tool_call") { + const id = chunk.toolName + "-" + Date.now() + opts.onToolCall({ + id, + tool: chunk.toolName ?? "", + params: chunk.input ?? {}, + status: "in-flight", + }) + } else if (chunk.type === "done") { + if (chunk.result?.answer) finalAnswer = chunk.result.answer + if (Array.isArray(chunk.result?.cited_ref_ids)) { + citedRefIds = chunk.result.cited_ref_ids } + } else if (chunk.type === "error") { + opts.onError(new Error(chunk.error ?? "Agent error")) + return } } catch { - // Non-JSON lines are plain text deltas (some SSE formats) - if (raw && raw !== "[DONE]") { - finalAnswer += raw - opts.onChunk(raw) - } + // skip non-JSON lines } } } @@ -213,7 +180,7 @@ export async function streamAgent( const headers: Record = { "Content-Type": "application/json", - Accept: "text/event-stream", + Accept: "application/json", } if (l402) headers["Authorization"] = l402 @@ -224,7 +191,6 @@ export async function streamAgent( headers, body: JSON.stringify({ prompt, - stream: true, sessionId: opts.sessionId, }), signal: opts.signal, @@ -252,13 +218,44 @@ export async function streamAgent( return } - const reader = response.body?.getReader() + let startPayload: { request_id: string; sessionId?: string } + try { + startPayload = await response.json() + } catch { + opts.onError(new Error("Invalid JSON from agent")) + return + } + const { request_id } = startPayload + if (!request_id) { + opts.onError(new Error("No request_id in agent response")) + return + } + + const eventsUrl = await buildSignedUrl(`/v2/agent/events/${request_id}`) + let eventsRes: Response + try { + eventsRes = await fetch(eventsUrl, { + method: "GET", + signal: opts.signal, + }) + } catch (err) { + if (err instanceof DOMException && err.name === "AbortError") return + opts.onError(err instanceof Error ? err : new Error(String(err))) + return + } + + if (!eventsRes.ok) { + opts.onError(new Error(`Events stream failed: ${eventsRes.status}`)) + return + } + + const reader = eventsRes.body?.getReader() if (!reader) { - opts.onError(new Error("No response body")) + opts.onError(new Error("No events response body")) return } - await processSSEStream(reader, opts, () => doRequest(true)) + await processEventStream(reader, opts) } return doRequest()