Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 178 additions & 46 deletions crates/aof-runtime/src/executor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,29 @@ impl Runtime {
let mut all_mcp_servers = config.mcp_servers.clone();
all_mcp_servers.extend(type_based_mcp_servers);

// Create combined executor with builtin tools and MCP
if !builtin_tool_names.is_empty() {
info!("Creating combined executor: builtin={:?}, mcp_servers={}", builtin_tool_names, all_mcp_servers.len());
// For now, prioritize MCP if present
Some(self.create_mcp_executor_from_config(&all_mcp_servers).await?)
// Try to create MCP executor, but fall back to builtin tools if MCP fails
let mcp_executor = self.create_mcp_executor_from_config(&all_mcp_servers).await?;

if let Some(mcp_exec) = mcp_executor {
if !builtin_tool_names.is_empty() {
// Create combined executor with both builtin and MCP tools
info!("Creating combined executor: builtin={:?}, mcp available", builtin_tool_names);
let builtin_exec = self.create_system_executor(&builtin_tool_names)?;
Some(Arc::new(CombinedToolExecutor {
primary: builtin_exec,
secondary: Some(mcp_exec),
}))
} else {
Some(mcp_exec)
}
} else if !builtin_tool_names.is_empty() {
// MCP failed, but we have builtin tools - use those
info!("MCP initialization failed, using builtin tools only: {:?}", builtin_tool_names);
Some(self.create_system_executor(&builtin_tool_names)?)
} else {
Some(self.create_mcp_executor_from_config(&all_mcp_servers).await?)
// No MCP and no builtin tools
warn!("No tools available: MCP initialization failed and no builtin tools configured");
None
}
} else if !builtin_tool_names.is_empty() {
info!("Creating system executor for type-based tools: {:?}", builtin_tool_names);
Expand All @@ -147,7 +163,13 @@ impl Runtime {
} else if !config.mcp_servers.is_empty() {
// Use the new flexible MCP configuration
info!("Using MCP servers for tools");
Some(self.create_mcp_executor_from_config(&config.mcp_servers).await?)
match self.create_mcp_executor_from_config(&config.mcp_servers).await? {
Some(executor) => Some(executor),
None => {
warn!("MCP servers configured but none could be initialized");
None
}
}
} else if !config.tools.is_empty() {
// Separate built-in tools from MCP tools
let builtin_tools: Vec<&str> = config.tools.iter()
Expand Down Expand Up @@ -518,28 +540,37 @@ impl Runtime {
}

// Helper: Create MCP executor from flexible config
// Returns None if no MCP servers could be initialized (graceful degradation)
async fn create_mcp_executor_from_config(
&self,
mcp_servers: &[McpServerConfig],
) -> AofResult<Arc<dyn ToolExecutor>> {
) -> AofResult<Option<Arc<dyn ToolExecutor>>> {
info!("Creating MCP executor from {} server configs", mcp_servers.len());

let mut clients: Vec<Arc<aof_mcp::McpClient>> = Vec::new();
let mut all_tool_names: Vec<String> = Vec::new();
let mut initialization_errors: Vec<String> = Vec::new();

for server_config in mcp_servers {
// Validate the config
if let Err(e) = server_config.validate() {
warn!("Invalid MCP server config '{}': {}", server_config.name, e);
initialization_errors.push(format!("{}: {}", server_config.name, e));
continue;
}

info!("Initializing MCP server: {} ({:?})", server_config.name, server_config.transport);

let mcp_client = match server_config.transport {
McpTransport::Stdio => {
let command = server_config.command.as_ref()
.ok_or_else(|| AofError::config("Stdio transport requires command"))?;
let command = match server_config.command.as_ref() {
Some(cmd) => cmd,
None => {
warn!("MCP server '{}': Stdio transport requires command", server_config.name);
initialization_errors.push(format!("{}: Stdio transport requires command", server_config.name));
continue;
}
};

let mut builder = McpClientBuilder::new()
.stdio(command.clone(), server_config.args.clone());
Expand All @@ -549,46 +580,66 @@ impl Runtime {
builder = builder.with_env(key.clone(), value.clone());
}

builder.build()
.map_err(|e| AofError::tool(format!(
"Failed to create MCP client for '{}': {}", server_config.name, e
)))?
match builder.build() {
Ok(client) => client,
Err(e) => {
warn!("Failed to create MCP client for '{}': {}", server_config.name, e);
initialization_errors.push(format!("{}: {}", server_config.name, e));
continue;
}
}
}
#[cfg(feature = "sse")]
McpTransport::Sse => {
let endpoint = server_config.endpoint.as_ref()
.ok_or_else(|| AofError::config("SSE transport requires endpoint"))?;

McpClientBuilder::new()
.sse(endpoint.clone())
.build()
.map_err(|e| AofError::tool(format!(
"Failed to create SSE MCP client for '{}': {}", server_config.name, e
)))?
let endpoint = match server_config.endpoint.as_ref() {
Some(ep) => ep,
None => {
warn!("MCP server '{}': SSE transport requires endpoint", server_config.name);
initialization_errors.push(format!("{}: SSE transport requires endpoint", server_config.name));
continue;
}
};

match McpClientBuilder::new().sse(endpoint.clone()).build() {
Ok(client) => client,
Err(e) => {
warn!("Failed to create SSE MCP client for '{}': {}", server_config.name, e);
initialization_errors.push(format!("{}: {}", server_config.name, e));
continue;
}
}
}
#[cfg(feature = "http")]
McpTransport::Http => {
let endpoint = server_config.endpoint.as_ref()
.ok_or_else(|| AofError::config("HTTP transport requires endpoint"))?;

McpClientBuilder::new()
.http(endpoint.clone())
.build()
.map_err(|e| AofError::tool(format!(
"Failed to create HTTP MCP client for '{}': {}", server_config.name, e
)))?
let endpoint = match server_config.endpoint.as_ref() {
Some(ep) => ep,
None => {
warn!("MCP server '{}': HTTP transport requires endpoint", server_config.name);
initialization_errors.push(format!("{}: HTTP transport requires endpoint", server_config.name));
continue;
}
};

match McpClientBuilder::new().http(endpoint.clone()).build() {
Ok(client) => client,
Err(e) => {
warn!("Failed to create HTTP MCP client for '{}': {}", server_config.name, e);
initialization_errors.push(format!("{}: {}", server_config.name, e));
continue;
}
}
}
#[cfg(not(feature = "sse"))]
McpTransport::Sse => {
return Err(AofError::config(
"SSE transport not enabled. Enable the 'sse' feature in aof-mcp"
));
warn!("MCP server '{}': SSE transport not enabled", server_config.name);
initialization_errors.push(format!("{}: SSE transport not enabled", server_config.name));
continue;
}
#[cfg(not(feature = "http"))]
McpTransport::Http => {
return Err(AofError::config(
"HTTP transport not enabled. Enable the 'http' feature in aof-mcp"
));
warn!("MCP server '{}': HTTP transport not enabled", server_config.name);
initialization_errors.push(format!("{}: HTTP transport not enabled", server_config.name));
continue;
}
};

Expand All @@ -611,25 +662,27 @@ impl Runtime {
}
Err(e) => {
warn!("Failed to initialize MCP server '{}': {}", server_config.name, e);
if !server_config.auto_reconnect {
return Err(AofError::tool(format!(
"MCP server '{}' initialization failed: {}", server_config.name, e
)));
}
initialization_errors.push(format!("{}: {}", server_config.name, e));
// Continue to next server instead of failing entirely
}
}
}

if clients.is_empty() {
return Err(AofError::tool("No MCP servers could be initialized"));
// Log all errors but return None for graceful degradation
warn!(
"No MCP servers could be initialized. Errors: {:?}. Agent will continue without MCP tools.",
initialization_errors
);
return Ok(None);
}

info!("MCP executor created with {} servers and {} tools", clients.len(), all_tool_names.len());

Ok(Arc::new(MultiMcpToolExecutor {
Ok(Some(Arc::new(MultiMcpToolExecutor {
clients,
tool_names: all_tool_names,
}))
})))
}

// Helper: Create system tool executor for shell/kubectl commands
Expand Down Expand Up @@ -856,6 +909,61 @@ impl ToolExecutor for MultiMcpToolExecutor {
}
}

/// Combined tool executor that wraps multiple executors
/// Tries primary executor first, then secondary if tool not found
struct CombinedToolExecutor {
primary: Arc<dyn ToolExecutor>,
secondary: Option<Arc<dyn ToolExecutor>>,
}

#[async_trait]
impl ToolExecutor for CombinedToolExecutor {
async fn execute_tool(
&self,
name: &str,
input: ToolInput,
) -> AofResult<aof_core::ToolResult> {
debug!("Executing tool '{}' via combined executor", name);

// Check if primary executor has this tool
let primary_tools: std::collections::HashSet<_> = self.primary.list_tools()
.iter()
.map(|t| t.name.clone())
.collect();

if primary_tools.contains(name) {
return self.primary.execute_tool(name, input).await;
}

// Try secondary executor if available
if let Some(ref secondary) = self.secondary {
return secondary.execute_tool(name, input).await;
}

// Tool not found in any executor
Ok(aof_core::ToolResult {
success: false,
data: serde_json::json!({}),
error: Some(format!("Tool '{}' not found in any executor", name)),
execution_time_ms: 0,
})
}

fn list_tools(&self) -> Vec<ToolDefinition> {
let mut tools = self.primary.list_tools();
if let Some(ref secondary) = self.secondary {
tools.extend(secondary.list_tools());
}
tools
}

fn get_tool(&self, name: &str) -> Option<Arc<dyn Tool>> {
self.primary.get_tool(name).or_else(|| {
self.secondary.as_ref().and_then(|s| s.get_tool(name))
})
}
}

/// Helper function to create a BuiltinToolExecutor from aof-tools
fn create_builtin_executor_for_tools(tool_names: &[String]) -> Arc<dyn ToolExecutor> {
use aof_tools::ToolRegistry;
Expand Down Expand Up @@ -1266,4 +1374,28 @@ mod tests {
assert_eq!(model_config.provider, ModelProvider::Anthropic);
assert_eq!(model_config.model, "gpt-4");
}

#[tokio::test]
async fn test_mcp_executor_graceful_failure() {
let runtime = Runtime::new();

// Create MCP config with non-existent command
let mcp_servers = vec![McpServerConfig {
name: "non-existent-mcp".to_string(),
transport: McpTransport::Stdio,
command: Some("non-existent-command-that-does-not-exist".to_string()),
args: vec![],
endpoint: None,
env: Default::default(),
tools: vec![],
timeout_secs: 5,
auto_reconnect: false,
init_options: None,
}];

// Should return None instead of error (graceful degradation)
let result = runtime.create_mcp_executor_from_config(&mcp_servers).await;
assert!(result.is_ok(), "Should not return error for failed MCP init");
assert!(result.unwrap().is_none(), "Should return None when no MCP servers initialize");
}
}
28 changes: 28 additions & 0 deletions docs/reference/agent-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,34 @@ spec:

For more details, see [MCP Integration Guide](../tools/mcp-integration.md).

### Graceful Degradation

When MCP servers fail to initialize (e.g., unavailable server, network issues, missing packages), the agent will:

1. **Log a warning** with detailed error information
2. **Continue loading** with any successfully initialized tools
3. **Fall back to builtin tools** if configured alongside MCP

This ensures agents remain functional even when some external tools are unavailable.

**Example with fallback:**
```yaml
spec:
tools:
# Builtin Shell tool - always available
- type: Shell
config:
allowed_commands: [kubectl, helm]

# MCP tool - optional, agent continues if unavailable
- type: MCP
config:
name: kubernetes-mcp
command: ["npx", "-y", "@example/mcp-server-kubernetes"]
```

If the MCP server fails to start, the agent will still load with the Shell tool available.

---

## Memory Configuration
Expand Down