diff --git a/package-lock.json b/package-lock.json index f93d45f..56f73b1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1023,7 +1023,6 @@ "integrity": "sha512-Rs1bVAIdBs5gbTIKza/tgpMuG1k3U/UMJLWecIMxNdJFDMzcM5LOiLVRYh3PilWEYDIeUDv7bpiHPLPsbydGcw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~6.21.0" } @@ -1041,7 +1040,6 @@ "integrity": "sha512-z9VXpC7MWrhfWipitjNdgCauoMLRdIILQsAEV+ZesIzBq/oUlxk0m3ApZuMFCXdnS4U7KrI+l3WRUEGQ8K1QKw==", "devOptional": true, "license": "MIT", - "peer": true, "dependencies": { "@types/prop-types": "*", "csstype": "^3.2.2" @@ -2504,7 +2502,6 @@ "resolved": "https://registry.npmjs.org/react/-/react-18.3.1.tgz", "integrity": "sha512-wS+hAgJShR0KhEvPJArfuPVN1+Hz1t0Y6n5jLrGQbkb4urgPE/0Rve+1kMB1v/oWgHgm4WIcV+i7F2pTVj+2iQ==", "license": "MIT", - "peer": true, "dependencies": { "loose-envify": "^1.1.0" }, @@ -3031,7 +3028,6 @@ "integrity": "sha512-7dxoA6kYvtgWw80265MyqJlkRl4yawIjO7S5MigytjELkX43fV2WsAXzsNfO7sBpPPCF5Gp0+XzHk0DwLCq3xQ==", "hasInstallScript": true, "license": "MIT", - "peer": true, "dependencies": { "node-addon-api": "^8.0.0", "node-gyp-build": "^4.8.0" @@ -4004,7 +4000,6 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==", "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/src/agents/BaseAgent.ts b/src/agents/BaseAgent.ts index de7c4b0..12c3d5c 100644 --- a/src/agents/BaseAgent.ts +++ b/src/agents/BaseAgent.ts @@ -1,5 +1,4 @@ import { Anthropic } from "@anthropic-ai/sdk"; -import pRetry from "p-retry"; import { logger } from "../config/logger.js"; import { AnthropicClient } from "../llm/AnthropicClient.js"; @@ -26,51 +25,112 @@ export abstract class BaseAgent { this.client = new AnthropicClient(); } + abstract executeTool(name: string, input: any): Promise; + async *run(input: AgentInput): AsyncGenerator { logger.info(`Agent ${this.name} starting run`); try { - yield* this.executeWithRetry(input); - } catch (error: any) { - logger.error(`Agent ${this.name} failed`, error); - yield { type: "error", content: error.message }; - } - } + const messages = [...input.messages]; + let isRunning = true; + + while (isRunning) { + const stream = this.client.stream( + messages, + this.systemPrompt, + this.tools.length > 0 ? this.tools : undefined + ); + + const toolCalls: Map = new Map(); + let fullResponse = ""; + + for await (const event of stream) { + if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') { + fullResponse += event.delta.text; + yield { type: 'text', content: event.delta.text }; + } else if (event.type === 'content_block_start' && event.content_block.type === 'tool_use') { + const call = { + id: event.content_block.id, + name: event.content_block.name, + input: "" + }; + toolCalls.set(event.index.toString(), call); + yield { + type: 'tool_use', + name: call.name, + id: call.id, + input: {} + }; + } else if (event.type === 'content_block_delta' && event.delta.type === 'input_json_delta') { + const call = toolCalls.get(event.index.toString()); + if (call) { + call.input += event.delta.partial_json; + yield { type: 'tool_use', content: event.delta.partial_json, id: 'delta' }; + } + } else if (event.type === 'message_stop') { + // Handled outside the loop + } + } - private async *executeWithRetry(input: AgentInput): AsyncGenerator { - // We can't easily retry a generator, so we retry the logic that sets it up - // But since we are streaming, retrying usually happens at the Orchestrator level - // This method proxies the stream from AnthropicClient - - // For now, we simply proxy the stream. - // Real retry logic for streaming is complex (need to buffer or restart). - // Orchestrator will handle task-level retries. - - const stream = this.client.stream( - input.messages, - this.systemPrompt, - this.tools.length > 0 ? this.tools : undefined - ); - - for await (const event of stream) { - if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') { - yield { type: 'text', content: event.delta.text }; - } else if (event.type === 'content_block_start' && event.content_block.type === 'tool_use') { - yield { - type: 'tool_use', - name: event.content_block.name, - id: event.content_block.id, - input: {} // Input builds up in deltas + // Construct the assistant's message for context + const assistantMessage: Anthropic.MessageParam = { + role: 'assistant', + content: [] }; - } else if (event.type === 'content_block_delta' && event.delta.type === 'input_json_delta') { - // We need to handle JSON delta accumulation in the Orchestrator or here. - // For simplicity, let's yield raw chunks and let Orchestrator assemble? - // Or better, let's assemble here if we want a clean interface. - // To keep it "blazing fast", we yield small chunks. - yield { type: 'tool_use', content: event.delta.partial_json, id: 'delta' }; // flagging as delta - } else if (event.type === 'message_stop') { - yield { type: 'done' }; + + if (fullResponse) { + (assistantMessage.content as any).push({ type: 'text', text: fullResponse }); + } + + if (toolCalls.size > 0) { + const toolResults: Anthropic.ToolResultBlockParam[] = []; + for (const call of toolCalls.values()) { + const toolInput = JSON.parse(call.input || "{}"); + (assistantMessage.content as any).push({ + type: 'tool_use', + id: call.id, + name: call.name, + input: toolInput + }); + + // Execute tool + logger.info(`Agent ${this.name} executing tool ${call.name}`); + try { + const result = await this.executeTool(call.name, toolInput); + yield { type: 'tool_result', name: call.name, content: result, id: call.id }; + + toolResults.push({ + type: 'tool_result', + tool_use_id: call.id, + content: result + }); + } catch (error: any) { + const errorMsg = `Error executing tool: ${error.message}`; + yield { type: 'error', content: errorMsg }; + toolResults.push({ + type: 'tool_result', + tool_use_id: call.id, + content: errorMsg, + is_error: true + }); + } + } + + messages.push(assistantMessage); + messages.push({ + role: 'user', + content: toolResults + }); + // Continue loop to get LLM's reaction to tool result(s) + } else { + // No tool calls, we are done + isRunning = false; + yield { type: 'done' }; + } } + } catch (error: any) { + logger.error(`Agent ${this.name} failed`, error); + yield { type: "error", content: error.message }; } } } diff --git a/src/agents/CodeReaderAgent.ts b/src/agents/CodeReaderAgent.ts index cc1c40c..111d76c 100644 --- a/src/agents/CodeReaderAgent.ts +++ b/src/agents/CodeReaderAgent.ts @@ -7,7 +7,7 @@ import Anthropic from "@anthropic-ai/sdk"; export class CodeReaderAgent extends BaseAgent { name = "CodeReader"; - systemPrompt = "You are a code reader. specific files or search for files to read."; + systemPrompt = "You are a code reader. Use tools to read specific files or search for files to read."; private astParser = new ASTParser(); tools: Anthropic.Tool[] = [ diff --git a/src/agents/ExplainerAgent.ts b/src/agents/ExplainerAgent.ts index c8fbe0e..4de0a83 100644 --- a/src/agents/ExplainerAgent.ts +++ b/src/agents/ExplainerAgent.ts @@ -1,5 +1,4 @@ -import { BaseAgent, AgentInput } from "./BaseAgent.js"; -import { AnthropicClient } from "../llm/AnthropicClient.js"; +import { BaseAgent } from "./BaseAgent.js"; export class ExplainerAgent extends BaseAgent { name = "Explainer"; @@ -9,4 +8,8 @@ export class ExplainerAgent extends BaseAgent { constructor() { super(); } + + async executeTool(name: string, input: any): Promise { + throw new Error(`Tool ${name} not found`); + } } diff --git a/src/orchestrator/OrchestratorAgent.ts b/src/orchestrator/OrchestratorAgent.ts index 359e814..e66d10a 100644 --- a/src/orchestrator/OrchestratorAgent.ts +++ b/src/orchestrator/OrchestratorAgent.ts @@ -41,79 +41,67 @@ export class OrchestratorAgent { } // 2. Execute Agents (Parallel if multiple) - const promises = classifications.map(async (cls) => { + const iterators = classifications.map((cls) => { const agent = this.registry.get(cls.agent); if (!agent) { - return [{ type: "error", content: `Agent ${cls.agent} not found` } as AgentChunk]; + return (async function* () { + yield { type: "error", content: `Agent ${cls.agent} not found` } as AgentChunk; + })(); } - const chunks: AgentChunk[] = []; - try { - // If it's the Planner, we need to handle it specifically - // But for now, let's just run it. The output will be the plan. - // If we want to AUTO-EXECUTE the plan, we need to intercept. - + return (async function* () { const stream = agent.run({ messages: [{ role: 'user', content: userInput }] }); for await (const chunk of stream) { - chunks.push(chunk); - } - - // If Planner, check for plan - if (cls.agent === "Planner") { - const planner = agent as PlannerAgent; - if (planner.latestPlan) { - chunks.push({ type: "text", content: "\nExecuting Plan...\n" }); - // Execute steps sequentially - for (const step of planner.latestPlan) { - chunks.push({ type: "text", content: `\n> Step: ${step.agent} - ${step.instruction}\n` }); - // Recursive dispatch? Or just get agent and run? - // Recursive dispatch is safer as it goes through classification? - // No, we know the agent. - const stepAgent = this.registry.get(step.agent); - if (stepAgent) { - const stepStream = stepAgent.run({ - messages: [{ role: 'user', content: step.instruction }] - }); - for await (const stepChunk of stepStream) { - chunks.push(stepChunk); + yield chunk; + + // If Planner, check for plan + if (chunk.type === "done" && cls.agent === "Planner") { + const planner = agent as PlannerAgent; + if (planner.latestPlan) { + yield { type: "text", content: "\nExecuting Plan...\n" }; + // Execute steps sequentially + for (const step of planner.latestPlan) { + yield { type: "text", content: `\n> Step: ${step.agent} - ${step.instruction}\n` }; + const stepAgent = this.registry.get(step.agent); + if (stepAgent) { + const stepStream = stepAgent.run({ + messages: [{ role: 'user', content: step.instruction }] + }); + for await (const stepChunk of stepStream) { + yield stepChunk; + } + } else { + yield { type: "error", content: `Agent ${step.agent} not found for step.` }; } - } else { - chunks.push({ type: "error", content: `Agent ${step.agent} not found for step.` }); } + // Clear plan + planner.latestPlan = null; } - // Clear plan - planner.latestPlan = null; } } - - return chunks; - - } catch (error: any) { - logger.error(`Execution failed for ${cls.agent}`, error); - return [{ type: "error", content: error.message } as AgentChunk]; - } + })(); }); - // Resolve all promises (Parallel Execution) - // Note: yielding from parallel promises is tricky for order, - // but for "simultaneous", we want to yield as they come in? - // Generator can't await multiple things in parallel and yield mixed stream easily without helper. - // For simplicity, we'll await all and then yield (which isn't streaming). - // OR we use a helper to race/merge streams. - // Given current architecture, let's await all for "fast" mode batching, - // or just execute sequentially if we want true streaming? - // The user asked for "simultaneously". - - // Simple parallel implementation: Wait for all, then stream results combined. - // True streaming parallel is harder in a single generator. + // Merge streams in parallel + yield* this.mergeStreams(iterators); + } - const results = await Promise.all(promises); - for (const resultChunks of results) { - for (const chunk of resultChunks) { - yield chunk; + private async *mergeStreams(iterators: AsyncGenerator[]): AsyncGenerator { + const nexts = iterators.map(async (it) => ({ it, next: await it.next() })); + const pending = new Set(nexts); + + while (pending.size > 0) { + const finished = await Promise.race(Array.from(pending)); + if (finished.next.done) { + pending.delete(finished); + } else { + yield finished.next.value; + // Replace the promise with the next one from the same iterator + pending.delete(finished); + pending.add(finished.it.next().then(next => ({ it: finished.it, next }))); } } } diff --git a/tests/agents.test.ts b/tests/agents.test.ts index 5c532d2..d03c38d 100644 --- a/tests/agents.test.ts +++ b/tests/agents.test.ts @@ -1,7 +1,53 @@ -import { describe, it, expect } from "vitest"; +import { describe, it, expect, vi } from "vitest"; import { AgentRegistry } from "../src/orchestrator/AgentRegistry.js"; import { ExplainerAgent } from "../src/agents/ExplainerAgent.js"; import { CodeReaderAgent } from "../src/agents/CodeReaderAgent.js"; +import { BaseAgent, AgentChunk } from "../src/agents/BaseAgent.js"; + +// Mock AnthropicClient +vi.mock("../src/llm/AnthropicClient.js", () => { + return { + AnthropicClient: vi.fn().mockImplementation(() => { + return { + stream: vi.fn().mockImplementation(async function* (messages, system, tools) { + // Simple mock streaming logic + // First time, return two tool calls + if (tools && tools.length > 0 && messages.length === 1) { + yield { + type: 'content_block_start', + index: 0, + content_block: { type: 'tool_use', id: 'call_1', name: tools[0].name } + }; + yield { + type: 'content_block_delta', + index: 0, + delta: { type: 'input_json_delta', partial_json: '{"path": "test1.txt"}' } + }; + yield { + type: 'content_block_start', + index: 1, + content_block: { type: 'tool_use', id: 'call_2', name: tools[0].name } + }; + yield { + type: 'content_block_delta', + index: 1, + delta: { type: 'input_json_delta', partial_json: '{"path": "test2.txt"}' } + }; + yield { type: 'message_stop' }; + } else { + // Second time (after tool result), just return text + yield { + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'Both tools executed successfully' } + }; + yield { type: 'message_stop' }; + } + }) + }; + }) + }; +}); describe("AgentRegistry", () => { it("should register and retrieve agents", () => { @@ -14,19 +60,22 @@ describe("AgentRegistry", () => { }); }); -describe("ExplainerAgent", () => { - it("should have correct name and system prompt", () => { - const agent = new ExplainerAgent(); - expect(agent.name).toBe("Explainer"); - expect(agent.systemPrompt).toContain("helpful coding assistant"); - }); -}); - -describe("CodeReaderAgent", () => { - it("should have tools", () => { +describe("BaseAgent Tool Loop", () => { + it("should handle multi-tool execution loop", async () => { const agent = new CodeReaderAgent(); - expect(agent.tools).toBeDefined(); - expect(agent.tools.length).toBeGreaterThan(0); - expect(agent.tools[0].name).toBe("read_file"); + // Mock executeTool + const executeToolSpy = vi.spyOn(agent, 'executeTool').mockResolvedValue("Mocked tool result"); + + const chunks: AgentChunk[] = []; + for await (const chunk of agent.run({ messages: [{ role: 'user', content: 'read test1.txt and test2.txt' }] })) { + chunks.push(chunk); + } + + expect(executeToolSpy).toHaveBeenCalledWith("read_file", { path: "test1.txt" }); + expect(executeToolSpy).toHaveBeenCalledWith("read_file", { path: "test2.txt" }); + expect(chunks).toContainEqual(expect.objectContaining({ type: 'tool_result', id: 'call_1', content: "Mocked tool result" })); + expect(chunks).toContainEqual(expect.objectContaining({ type: 'tool_result', id: 'call_2', content: "Mocked tool result" })); + expect(chunks).toContainEqual(expect.objectContaining({ type: 'text', content: "Both tools executed successfully" })); + expect(chunks).toContainEqual(expect.objectContaining({ type: 'done' })); }); });