Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

138 changes: 99 additions & 39 deletions src/agents/BaseAgent.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Anthropic } from "@anthropic-ai/sdk";
import pRetry from "p-retry";
import { logger } from "../config/logger.js";
import { AnthropicClient } from "../llm/AnthropicClient.js";

Expand All @@ -26,51 +25,112 @@ export abstract class BaseAgent {
this.client = new AnthropicClient();
}

abstract executeTool(name: string, input: any): Promise<string>;

async *run(input: AgentInput): AsyncGenerator<AgentChunk> {
logger.info(`Agent ${this.name} starting run`);

try {
yield* this.executeWithRetry(input);
} catch (error: any) {
logger.error(`Agent ${this.name} failed`, error);
yield { type: "error", content: error.message };
}
}
const messages = [...input.messages];
let isRunning = true;

while (isRunning) {
const stream = this.client.stream(
messages,
this.systemPrompt,
this.tools.length > 0 ? this.tools : undefined
);

const toolCalls: Map<string, { id: string; name: string; input: string }> = new Map();
let fullResponse = "";

for await (const event of stream) {
if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
fullResponse += event.delta.text;
yield { type: 'text', content: event.delta.text };
} else if (event.type === 'content_block_start' && event.content_block.type === 'tool_use') {
const call = {
id: event.content_block.id,
name: event.content_block.name,
input: ""
};
toolCalls.set(event.index.toString(), call);
yield {
type: 'tool_use',
name: call.name,
id: call.id,
input: {}
};
} else if (event.type === 'content_block_delta' && event.delta.type === 'input_json_delta') {
const call = toolCalls.get(event.index.toString());
if (call) {
call.input += event.delta.partial_json;
yield { type: 'tool_use', content: event.delta.partial_json, id: 'delta' };
}
} else if (event.type === 'message_stop') {
// Handled outside the loop
}
}

private async *executeWithRetry(input: AgentInput): AsyncGenerator<AgentChunk> {
// We can't easily retry a generator, so we retry the logic that sets it up
// But since we are streaming, retrying usually happens at the Orchestrator level
// This method proxies the stream from AnthropicClient

// For now, we simply proxy the stream.
// Real retry logic for streaming is complex (need to buffer or restart).
// Orchestrator will handle task-level retries.

const stream = this.client.stream(
input.messages,
this.systemPrompt,
this.tools.length > 0 ? this.tools : undefined
);

for await (const event of stream) {
if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
yield { type: 'text', content: event.delta.text };
} else if (event.type === 'content_block_start' && event.content_block.type === 'tool_use') {
yield {
type: 'tool_use',
name: event.content_block.name,
id: event.content_block.id,
input: {} // Input builds up in deltas
// Construct the assistant's message for context
const assistantMessage: Anthropic.MessageParam = {
role: 'assistant',
content: []
};
} else if (event.type === 'content_block_delta' && event.delta.type === 'input_json_delta') {
// We need to handle JSON delta accumulation in the Orchestrator or here.
// For simplicity, let's yield raw chunks and let Orchestrator assemble?
// Or better, let's assemble here if we want a clean interface.
// To keep it "blazing fast", we yield small chunks.
yield { type: 'tool_use', content: event.delta.partial_json, id: 'delta' }; // flagging as delta
} else if (event.type === 'message_stop') {
yield { type: 'done' };

if (fullResponse) {
(assistantMessage.content as any).push({ type: 'text', text: fullResponse });
}

if (toolCalls.size > 0) {
const toolResults: Anthropic.ToolResultBlockParam[] = [];
for (const call of toolCalls.values()) {
const toolInput = JSON.parse(call.input || "{}");
(assistantMessage.content as any).push({
type: 'tool_use',
id: call.id,
name: call.name,
input: toolInput
});

// Execute tool
logger.info(`Agent ${this.name} executing tool ${call.name}`);
try {
const result = await this.executeTool(call.name, toolInput);
yield { type: 'tool_result', name: call.name, content: result, id: call.id };

toolResults.push({
type: 'tool_result',
tool_use_id: call.id,
content: result
});
} catch (error: any) {
const errorMsg = `Error executing tool: ${error.message}`;
yield { type: 'error', content: errorMsg };
toolResults.push({
type: 'tool_result',
tool_use_id: call.id,
content: errorMsg,
is_error: true
});
}
}

messages.push(assistantMessage);
messages.push({
role: 'user',
content: toolResults
});
// Continue loop to get LLM's reaction to tool result(s)
} else {
// No tool calls, we are done
isRunning = false;
yield { type: 'done' };
}
}
} catch (error: any) {
logger.error(`Agent ${this.name} failed`, error);
yield { type: "error", content: error.message };
}
}
}
2 changes: 1 addition & 1 deletion src/agents/CodeReaderAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Anthropic from "@anthropic-ai/sdk";

export class CodeReaderAgent extends BaseAgent {
name = "CodeReader";
systemPrompt = "You are a code reader. specific files or search for files to read.";
systemPrompt = "You are a code reader. Use tools to read specific files or search for files to read.";
private astParser = new ASTParser();

tools: Anthropic.Tool[] = [
Expand Down
7 changes: 5 additions & 2 deletions src/agents/ExplainerAgent.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { BaseAgent, AgentInput } from "./BaseAgent.js";
import { AnthropicClient } from "../llm/AnthropicClient.js";
import { BaseAgent } from "./BaseAgent.js";

export class ExplainerAgent extends BaseAgent {
name = "Explainer";
Expand All @@ -9,4 +8,8 @@ export class ExplainerAgent extends BaseAgent {
constructor() {
super();
}

async executeTool(name: string, input: any): Promise<string> {
throw new Error(`Tool ${name} not found`);
}
}
100 changes: 44 additions & 56 deletions src/orchestrator/OrchestratorAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,79 +41,67 @@ export class OrchestratorAgent {
}

// 2. Execute Agents (Parallel if multiple)
const promises = classifications.map(async (cls) => {
const iterators = classifications.map((cls) => {
const agent = this.registry.get(cls.agent);
if (!agent) {
return [{ type: "error", content: `Agent ${cls.agent} not found` } as AgentChunk];
return (async function* () {
yield { type: "error", content: `Agent ${cls.agent} not found` } as AgentChunk;
})();
}

const chunks: AgentChunk[] = [];
try {
// If it's the Planner, we need to handle it specifically
// But for now, let's just run it. The output will be the plan.
// If we want to AUTO-EXECUTE the plan, we need to intercept.

return (async function* () {
const stream = agent.run({
messages: [{ role: 'user', content: userInput }]
});

for await (const chunk of stream) {
chunks.push(chunk);
}

// If Planner, check for plan
if (cls.agent === "Planner") {
const planner = agent as PlannerAgent;
if (planner.latestPlan) {
chunks.push({ type: "text", content: "\nExecuting Plan...\n" });
// Execute steps sequentially
for (const step of planner.latestPlan) {
chunks.push({ type: "text", content: `\n> Step: ${step.agent} - ${step.instruction}\n` });
// Recursive dispatch? Or just get agent and run?
// Recursive dispatch is safer as it goes through classification?
// No, we know the agent.
const stepAgent = this.registry.get(step.agent);
if (stepAgent) {
const stepStream = stepAgent.run({
messages: [{ role: 'user', content: step.instruction }]
});
for await (const stepChunk of stepStream) {
chunks.push(stepChunk);
yield chunk;

// If Planner, check for plan
if (chunk.type === "done" && cls.agent === "Planner") {
const planner = agent as PlannerAgent;
if (planner.latestPlan) {
yield { type: "text", content: "\nExecuting Plan...\n" };
// Execute steps sequentially
for (const step of planner.latestPlan) {
yield { type: "text", content: `\n> Step: ${step.agent} - ${step.instruction}\n` };
const stepAgent = this.registry.get(step.agent);
if (stepAgent) {
const stepStream = stepAgent.run({
messages: [{ role: 'user', content: step.instruction }]
});
for await (const stepChunk of stepStream) {
yield stepChunk;
}
} else {
yield { type: "error", content: `Agent ${step.agent} not found for step.` };
}
} else {
chunks.push({ type: "error", content: `Agent ${step.agent} not found for step.` });
}
// Clear plan
planner.latestPlan = null;
}
// Clear plan
planner.latestPlan = null;
}
}

return chunks;

} catch (error: any) {
logger.error(`Execution failed for ${cls.agent}`, error);
return [{ type: "error", content: error.message } as AgentChunk];
}
})();
});

// Resolve all promises (Parallel Execution)
// Note: yielding from parallel promises is tricky for order,
// but for "simultaneous", we want to yield as they come in?
// Generator can't await multiple things in parallel and yield mixed stream easily without helper.
// For simplicity, we'll await all and then yield (which isn't streaming).
// OR we use a helper to race/merge streams.
// Given current architecture, let's await all for "fast" mode batching,
// or just execute sequentially if we want true streaming?
// The user asked for "simultaneously".

// Simple parallel implementation: Wait for all, then stream results combined.
// True streaming parallel is harder in a single generator.
// Merge streams in parallel
yield* this.mergeStreams(iterators);
}

const results = await Promise.all(promises);
for (const resultChunks of results) {
for (const chunk of resultChunks) {
yield chunk;
private async *mergeStreams(iterators: AsyncGenerator<AgentChunk>[]): AsyncGenerator<AgentChunk> {
const nexts = iterators.map(async (it) => ({ it, next: await it.next() }));
const pending = new Set(nexts);

while (pending.size > 0) {
const finished = await Promise.race(Array.from(pending));
if (finished.next.done) {
pending.delete(finished);
} else {
yield finished.next.value;
// Replace the promise with the next one from the same iterator
pending.delete(finished);
pending.add(finished.it.next().then(next => ({ it: finished.it, next })));
}
}
}
Expand Down
Loading