From 589e3633848c1bff9385c43133132171897ab7fb Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 22:18:49 +0800 Subject: [PATCH] feat(core): expose subagent task tracker for delegated runs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a materialized view over the existing SubagentStart/Progress/End event stream so callers can query the lifecycle of delegated child runs by task id — without scanning run_events() — making automatic parallel delegation observable like a task dashboard. - New `subagent_task_tracker` module with `SubagentTaskSnapshot` + `InMemorySubagentTaskTracker`; populated from runtime events via the existing `RuntimeEventSink::observe` path. - `AgentSession` gains `subagent_task(id)`, `subagent_tasks()`, and `pending_subagent_tasks()` query APIs, scoped to the parent session. - Fixes two pre-existing bugs in `TaskExecutor`: 1. `SubagentStart` now carries the real parent session id instead of `String::new()`, so tracker entries are correctly associated with the originating session. 2. `execute_background` now returns the same task id used in emitted events (previously it generated a throwaway id and `execute` internally generated a different one), and pre-emits `SubagentStart` synchronously so callers can query the tracker immediately after the background task is scheduled. Event stream remains the authoritative record; the tracker is a read-side projection that can be rebuilt by replaying events. --- core/src/agent_api.rs | 30 ++ core/src/agent_api/runtime_events.rs | 9 +- core/src/agent_api/session_builder.rs | 1 + core/src/agent_api/tests.rs | 93 +++++ core/src/lib.rs | 4 + core/src/subagent_task_tracker.rs | 336 ++++++++++++++++++ core/src/tools/task.rs | 113 ++++-- .../tests/test_task_permission_inheritance.rs | 4 +- 8 files changed, 565 insertions(+), 25 deletions(-) create mode 100644 core/src/subagent_task_tracker.rs diff --git a/core/src/agent_api.rs b/core/src/agent_api.rs index b392a87..0d5c09c 100644 --- a/core/src/agent_api.rs +++ b/core/src/agent_api.rs @@ -429,6 +429,8 @@ pub struct AgentSession { current_run_id: Arc>>, /// In-memory run snapshots and event replay buffer for this session. run_store: Arc, + /// Materialized view of delegated subagent task lifecycle, populated from runtime events. + subagent_tasks: Arc, /// Currently executing tools observed from runtime events. active_tools: Arc>>, /// Compact execution traces for this session. @@ -564,6 +566,34 @@ impl AgentSession { SessionView::from_session(self).active_tools().await } + /// Look up a delegated subagent task by id. Returns `None` if no such task + /// has been observed in this session. + pub async fn subagent_task( + &self, + task_id: &str, + ) -> Option { + self.subagent_tasks.get(task_id).await + } + + /// Return snapshots of every delegated subagent task observed in this + /// session (including completed and failed ones), oldest first. + pub async fn subagent_tasks(&self) -> Vec { + self.subagent_tasks.list_for_parent(&self.session_id).await + } + + /// Return snapshots of subagent tasks still in `Running` state. + pub async fn pending_subagent_tasks( + &self, + ) -> Vec { + use crate::subagent_task_tracker::SubagentStatus; + self.subagent_tasks + .list_for_parent(&self.session_id) + .await + .into_iter() + .filter(|task| task.status == SubagentStatus::Running) + .collect() + } + /// Return a snapshot of the session's conversation history. pub fn history(&self) -> Vec { SessionView::from_session(self).history() diff --git a/core/src/agent_api/runtime_events.rs b/core/src/agent_api/runtime_events.rs index 518ae56..5801375 100644 --- a/core/src/agent_api/runtime_events.rs +++ b/core/src/agent_api/runtime_events.rs @@ -47,6 +47,7 @@ pub(super) struct RuntimeEventSink { session_id: String, hook_executor: Option>, active_tools: ActiveToolMap, + subagent_tasks: Arc, } impl RuntimeEventSink { @@ -57,6 +58,7 @@ impl RuntimeEventSink { session.session_id.clone(), session.ahp_executor.clone(), Arc::clone(&session.active_tools), + Arc::clone(&session.subagent_tasks), ) } @@ -66,6 +68,7 @@ impl RuntimeEventSink { session_id: String, hook_executor: Option>, active_tools: ActiveToolMap, + subagent_tasks: Arc, ) -> Self { Self { run_store, @@ -73,6 +76,7 @@ impl RuntimeEventSink { session_id, hook_executor, active_tools, + subagent_tasks, } } @@ -106,7 +110,7 @@ impl RuntimeEventSink { }) } - async fn observe(&self, event: &AgentEvent) { + pub(super) async fn observe(&self, event: &AgentEvent) { let _ = self .run_store .record_event(&self.run_id, event.clone()) @@ -116,6 +120,7 @@ impl RuntimeEventSink { .record_agent_event(event, &self.run_id, &self.session_id) .await; } + self.subagent_tasks.record_event(event).await; self.apply(event).await; } @@ -200,6 +205,7 @@ mod tests { "session-1".to_string(), None, Arc::clone(&active_tools), + Arc::new(crate::subagent_task_tracker::InMemorySubagentTaskTracker::new()), ); sink.observe(&AgentEvent::ToolStart { @@ -239,6 +245,7 @@ mod tests { "session-1".to_string(), None, active_tools(), + Arc::new(crate::subagent_task_tracker::InMemorySubagentTaskTracker::new()), ); sink.observe(&AgentEvent::TextDelta { diff --git a/core/src/agent_api/session_builder.rs b/core/src/agent_api/session_builder.rs index fb9f96c..5d4be97 100644 --- a/core/src/agent_api/session_builder.rs +++ b/core/src/agent_api/session_builder.rs @@ -219,6 +219,7 @@ pub(super) fn build_agent_session( cancel_token: Arc::new(tokio::sync::Mutex::new(None)), current_run_id: Arc::new(tokio::sync::Mutex::new(None)), run_store: Arc::new(crate::run::InMemoryRunStore::new()), + subagent_tasks: Arc::new(crate::subagent_task_tracker::InMemorySubagentTaskTracker::new()), active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())), trace_sink, verification_reports: Arc::new(RwLock::new(Vec::new())), diff --git a/core/src/agent_api/tests.rs b/core/src/agent_api/tests.rs index a4f0014..7a9b835 100644 --- a/core/src/agent_api/tests.rs +++ b/core/src/agent_api/tests.rs @@ -2324,3 +2324,96 @@ fn test_session_command_is_available_from_queue_module() { use crate::queue::SessionCommand; let _ = std::marker::PhantomData::>; } + +#[tokio::test] +async fn subagent_events_populate_session_tracker() { + use super::runtime_events::RuntimeEventSink; + use crate::agent::AgentEvent; + use crate::subagent_task_tracker::SubagentStatus; + + let agent = Agent::from_config(test_config()).await.unwrap(); + let session = agent + .session("/tmp/test-ws-subagent-tracker", None) + .unwrap(); + + // Drive a synthetic subagent lifecycle through the session's runtime sink. + let run = session + .run_store + .create_run(session.session_id(), "parent prompt") + .await; + let sink = RuntimeEventSink::from_session(&session, &run.id); + + let task_id = "task-test-1".to_string(); + let child_session_id = format!("task-run-{}", task_id); + + sink.observe(&AgentEvent::SubagentStart { + task_id: task_id.clone(), + session_id: child_session_id.clone(), + parent_session_id: session.session_id().to_string(), + agent: "explore".to_string(), + description: "demo delegation".to_string(), + }) + .await; + + let snap = session + .subagent_task(&task_id) + .await + .expect("running task should be visible"); + assert_eq!(snap.status, SubagentStatus::Running); + assert_eq!(snap.parent_session_id, session.session_id()); + assert_eq!(snap.child_session_id, child_session_id); + assert_eq!(snap.agent, "explore"); + assert!(snap.finished_ms.is_none()); + + let pending = session.pending_subagent_tasks().await; + assert_eq!(pending.len(), 1); + assert_eq!(pending[0].task_id, task_id); + + sink.observe(&AgentEvent::SubagentEnd { + task_id: task_id.clone(), + session_id: child_session_id, + agent: "explore".to_string(), + output: "found things".to_string(), + success: true, + }) + .await; + + let snap = session.subagent_task(&task_id).await.unwrap(); + assert_eq!(snap.status, SubagentStatus::Completed); + assert_eq!(snap.success, Some(true)); + assert_eq!(snap.output.as_deref(), Some("found things")); + assert!(snap.finished_ms.is_some()); + + assert!(session.pending_subagent_tasks().await.is_empty()); + assert_eq!(session.subagent_tasks().await.len(), 1); +} + +#[tokio::test] +async fn subagent_tasks_scope_to_parent_session() { + use super::runtime_events::RuntimeEventSink; + use crate::agent::AgentEvent; + + let agent = Agent::from_config(test_config()).await.unwrap(); + let session_a = agent.session("/tmp/test-ws-subagent-a", None).unwrap(); + let session_b = agent.session("/tmp/test-ws-subagent-b", None).unwrap(); + + let run = session_a + .run_store + .create_run(session_a.session_id(), "p") + .await; + let sink = RuntimeEventSink::from_session(&session_a, &run.id); + + sink.observe(&AgentEvent::SubagentStart { + task_id: "task-from-a".to_string(), + session_id: "task-run-task-from-a".to_string(), + parent_session_id: session_a.session_id().to_string(), + agent: "explore".to_string(), + description: "isolated".to_string(), + }) + .await; + + // session A sees the task; session B has its own (empty) tracker. + assert_eq!(session_a.subagent_tasks().await.len(), 1); + assert!(session_b.subagent_tasks().await.is_empty()); + assert!(session_b.subagent_task("task-from-a").await.is_none()); +} diff --git a/core/src/lib.rs b/core/src/lib.rs index c1e1dc5..e5aee07 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -106,6 +106,7 @@ pub(crate) mod session_lane_queue; pub mod skills; pub mod store; pub mod subagent; +pub mod subagent_task_tracker; pub mod telemetry; #[cfg(feature = "telemetry")] pub mod telemetry_otel; @@ -138,6 +139,9 @@ pub use subagent::{ AgentDefinition, AgentRegistry, CattleAgentKind, CattleAgentSpec, ConfirmationInheritance, WorkerAgentKind, WorkerAgentSpec, }; +pub use subagent_task_tracker::{ + InMemorySubagentTaskTracker, SubagentProgressEntry, SubagentStatus, SubagentTaskSnapshot, +}; pub use tools::ToolErrorKind; pub use workspace::{ CommandOutput, CommandOutputObserver, CommandRequest, LocalWorkspaceBackend, RemoteGitBackend, diff --git a/core/src/subagent_task_tracker.rs b/core/src/subagent_task_tracker.rs new file mode 100644 index 0000000..5f174f6 --- /dev/null +++ b/core/src/subagent_task_tracker.rs @@ -0,0 +1,336 @@ +//! In-memory tracker for delegated subagent tasks. +//! +//! Materializes a queryable view of subagent task lifecycle from the +//! `AgentEvent` stream. The event stream remains the authoritative record; +//! this module exists so callers can ask "what is task X doing right now?" +//! without scanning `run_events()`. + +use crate::agent::AgentEvent; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tokio::sync::RwLock; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SubagentStatus { + Running, + Completed, + Failed, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubagentProgressEntry { + pub timestamp_ms: u64, + pub status: String, + pub metadata: serde_json::Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubagentTaskSnapshot { + pub task_id: String, + pub parent_session_id: String, + pub child_session_id: String, + pub agent: String, + pub description: String, + pub status: SubagentStatus, + pub started_ms: u64, + pub updated_ms: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub finished_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub output: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub success: Option, + pub progress: Vec, +} + +#[derive(Debug, Default)] +pub struct InMemorySubagentTaskTracker { + tasks: RwLock>, +} + +impl InMemorySubagentTaskTracker { + pub fn new() -> Self { + Self::default() + } + + /// Apply a single agent event to the tracker. Non-subagent events are ignored. + pub async fn record_event(&self, event: &AgentEvent) { + match event { + AgentEvent::SubagentStart { + task_id, + session_id, + parent_session_id, + agent, + description, + } => { + let now = now_ms(); + let mut tasks = self.tasks.write().await; + tasks + .entry(task_id.clone()) + .and_modify(|task| { + // Late start (e.g. background path) — keep the first-seen + // started_ms but refresh fields we now know. + task.parent_session_id = parent_session_id.clone(); + task.child_session_id = session_id.clone(); + task.agent = agent.clone(); + task.description = description.clone(); + task.updated_ms = now; + }) + .or_insert_with(|| SubagentTaskSnapshot { + task_id: task_id.clone(), + parent_session_id: parent_session_id.clone(), + child_session_id: session_id.clone(), + agent: agent.clone(), + description: description.clone(), + status: SubagentStatus::Running, + started_ms: now, + updated_ms: now, + finished_ms: None, + output: None, + success: None, + progress: Vec::new(), + }); + } + AgentEvent::SubagentProgress { + task_id, + session_id, + status, + metadata, + } => { + let now = now_ms(); + let mut tasks = self.tasks.write().await; + let entry = tasks + .entry(task_id.clone()) + .or_insert_with(|| SubagentTaskSnapshot { + task_id: task_id.clone(), + parent_session_id: String::new(), + child_session_id: session_id.clone(), + agent: String::new(), + description: String::new(), + status: SubagentStatus::Running, + started_ms: now, + updated_ms: now, + finished_ms: None, + output: None, + success: None, + progress: Vec::new(), + }); + entry.updated_ms = now; + entry.progress.push(SubagentProgressEntry { + timestamp_ms: now, + status: status.clone(), + metadata: metadata.clone(), + }); + } + AgentEvent::SubagentEnd { + task_id, + session_id, + agent, + output, + success, + } => { + let now = now_ms(); + let mut tasks = self.tasks.write().await; + let entry = tasks + .entry(task_id.clone()) + .or_insert_with(|| SubagentTaskSnapshot { + task_id: task_id.clone(), + parent_session_id: String::new(), + child_session_id: session_id.clone(), + agent: agent.clone(), + description: String::new(), + status: SubagentStatus::Running, + started_ms: now, + updated_ms: now, + finished_ms: None, + output: None, + success: None, + progress: Vec::new(), + }); + entry.status = if *success { + SubagentStatus::Completed + } else { + SubagentStatus::Failed + }; + entry.updated_ms = now; + entry.finished_ms = Some(now); + entry.output = Some(output.clone()); + entry.success = Some(*success); + } + _ => {} + } + } + + pub async fn get(&self, task_id: &str) -> Option { + self.tasks.read().await.get(task_id).cloned() + } + + pub async fn list(&self) -> Vec { + let mut tasks = self + .tasks + .read() + .await + .values() + .cloned() + .collect::>(); + tasks.sort_by_key(|task| task.started_ms); + tasks + } + + pub async fn list_pending(&self) -> Vec { + self.list() + .await + .into_iter() + .filter(|task| task.status == SubagentStatus::Running) + .collect() + } + + pub async fn list_for_parent(&self, parent_session_id: &str) -> Vec { + self.list() + .await + .into_iter() + .filter(|task| task.parent_session_id == parent_session_id) + .collect() + } +} + +fn now_ms() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn start_event(task_id: &str, parent: &str, child: &str) -> AgentEvent { + AgentEvent::SubagentStart { + task_id: task_id.to_string(), + session_id: child.to_string(), + parent_session_id: parent.to_string(), + agent: "explore".to_string(), + description: "find things".to_string(), + } + } + + fn progress_event(task_id: &str, child: &str, status: &str) -> AgentEvent { + AgentEvent::SubagentProgress { + task_id: task_id.to_string(), + session_id: child.to_string(), + status: status.to_string(), + metadata: serde_json::json!({}), + } + } + + fn end_event(task_id: &str, child: &str, success: bool) -> AgentEvent { + AgentEvent::SubagentEnd { + task_id: task_id.to_string(), + session_id: child.to_string(), + agent: "explore".to_string(), + output: "done".to_string(), + success, + } + } + + #[tokio::test] + async fn lifecycle_start_progress_end_transitions_status() { + let tracker = InMemorySubagentTaskTracker::new(); + + tracker + .record_event(&start_event("task-1", "parent", "child")) + .await; + let snap = tracker.get("task-1").await.unwrap(); + assert_eq!(snap.status, SubagentStatus::Running); + assert_eq!(snap.parent_session_id, "parent"); + assert_eq!(snap.child_session_id, "child"); + assert!(snap.finished_ms.is_none()); + + tracker + .record_event(&progress_event("task-1", "child", "tool_completed: bash")) + .await; + let snap = tracker.get("task-1").await.unwrap(); + assert_eq!(snap.status, SubagentStatus::Running); + assert_eq!(snap.progress.len(), 1); + + tracker + .record_event(&end_event("task-1", "child", true)) + .await; + let snap = tracker.get("task-1").await.unwrap(); + assert_eq!(snap.status, SubagentStatus::Completed); + assert_eq!(snap.success, Some(true)); + assert_eq!(snap.output.as_deref(), Some("done")); + assert!(snap.finished_ms.is_some()); + } + + #[tokio::test] + async fn failed_end_event_marks_status_failed() { + let tracker = InMemorySubagentTaskTracker::new(); + tracker + .record_event(&start_event("task-2", "parent", "child")) + .await; + tracker + .record_event(&end_event("task-2", "child", false)) + .await; + let snap = tracker.get("task-2").await.unwrap(); + assert_eq!(snap.status, SubagentStatus::Failed); + assert_eq!(snap.success, Some(false)); + } + + #[tokio::test] + async fn pending_list_excludes_completed_tasks() { + let tracker = InMemorySubagentTaskTracker::new(); + tracker + .record_event(&start_event("task-a", "parent", "child-a")) + .await; + tracker + .record_event(&start_event("task-b", "parent", "child-b")) + .await; + tracker + .record_event(&end_event("task-a", "child-a", true)) + .await; + + let pending = tracker.list_pending().await; + assert_eq!(pending.len(), 1); + assert_eq!(pending[0].task_id, "task-b"); + } + + #[tokio::test] + async fn list_for_parent_filters_by_session() { + let tracker = InMemorySubagentTaskTracker::new(); + tracker + .record_event(&start_event("task-a", "session-1", "child-a")) + .await; + tracker + .record_event(&start_event("task-b", "session-2", "child-b")) + .await; + + let mine = tracker.list_for_parent("session-1").await; + assert_eq!(mine.len(), 1); + assert_eq!(mine[0].task_id, "task-a"); + } + + #[tokio::test] + async fn end_before_start_still_records_terminal_state() { + let tracker = InMemorySubagentTaskTracker::new(); + tracker + .record_event(&end_event("task-late", "child", true)) + .await; + let snap = tracker.get("task-late").await.unwrap(); + assert_eq!(snap.status, SubagentStatus::Completed); + } + + #[tokio::test] + async fn non_subagent_events_are_ignored() { + let tracker = InMemorySubagentTaskTracker::new(); + tracker + .record_event(&AgentEvent::TextDelta { + text: "ignore me".to_string(), + }) + .await; + assert!(tracker.list().await.is_empty()); + } +} diff --git a/core/src/tools/task.rs b/core/src/tools/task.rs index bb77c02..1ef612a 100644 --- a/core/src/tools/task.rs +++ b/core/src/tools/task.rs @@ -191,12 +191,32 @@ impl TaskExecutor { } /// Execute a task by spawning an isolated child AgentLoop. + /// + /// `parent_session_id` flows into the emitted `SubagentStart`/`SubagentEnd` + /// events so dashboards can associate child runs with the parent session. pub async fn execute( &self, params: TaskParams, event_tx: Option>, + parent_session_id: Option<&str>, ) -> Result { let task_id = format!("task-{}", uuid::Uuid::new_v4()); + self.execute_with_task_id(task_id, params, event_tx, parent_session_id, true) + .await + } + + /// Execute a task using a caller-supplied task id. Used by `execute_background` + /// so the synchronously-returned task id matches the one in lifecycle events. + /// When `emit_start` is `false` the caller is responsible for emitting + /// `SubagentStart` themselves (e.g. to avoid a race against a tracker query). + pub async fn execute_with_task_id( + &self, + task_id: String, + params: TaskParams, + event_tx: Option>, + parent_session_id: Option<&str>, + emit_start: bool, + ) -> Result { let session_id = format!("task-run-{}", task_id); let agent = self @@ -204,14 +224,16 @@ impl TaskExecutor { .get(¶ms.agent) .context(format!("Unknown agent type: '{}'", params.agent))?; - if let Some(ref tx) = event_tx { - let _ = tx.send(AgentEvent::SubagentStart { - task_id: task_id.clone(), - session_id: session_id.clone(), - parent_session_id: String::new(), - agent: params.agent.clone(), - description: params.description.clone(), - }); + if emit_start { + if let Some(ref tx) = event_tx { + let _ = tx.send(AgentEvent::SubagentStart { + task_id: task_id.clone(), + session_id: session_id.clone(), + parent_session_id: parent_session_id.unwrap_or_default().to_string(), + agent: params.agent.clone(), + description: params.description.clone(), + }); + } } // Build a child ToolExecutor. Task tools are intentionally omitted @@ -324,18 +346,44 @@ impl TaskExecutor { /// Execute a task in the background. /// - /// Returns immediately with the task ID. Use events to track progress. + /// Returns immediately with the task ID; the same id is used in the emitted + /// `SubagentStart`/`SubagentEnd` events so callers can correlate. Pre-emits + /// `SubagentStart` synchronously when an event channel is available so a + /// caller that queries the subagent task tracker right after this call + /// observes the task in `Running` state without a race window. pub fn execute_background( self: Arc, params: TaskParams, event_tx: Option>, + parent_session_id: Option, ) -> String { let task_id = format!("task-{}", uuid::Uuid::new_v4()); - let task_id_clone = task_id.clone(); + let session_id = format!("task-run-{}", task_id); + + if let Some(ref tx) = event_tx { + let _ = tx.send(AgentEvent::SubagentStart { + task_id: task_id.clone(), + session_id, + parent_session_id: parent_session_id.clone().unwrap_or_default(), + agent: params.agent.clone(), + description: params.description.clone(), + }); + } + let task_id_for_spawn = task_id.clone(); + let task_id_for_log = task_id.clone(); tokio::spawn(async move { - if let Err(e) = self.execute(params, event_tx).await { - tracing::error!("Background task {} failed: {}", task_id_clone, e); + if let Err(e) = self + .execute_with_task_id( + task_id_for_spawn, + params, + event_tx, + parent_session_id.as_deref(), + false, + ) + .await + { + tracing::error!("Background task {} failed: {}", task_id_for_log, e); } }); @@ -350,20 +398,26 @@ impl TaskExecutor { self: &Arc, tasks: Vec, event_tx: Option>, + parent_session_id: Option<&str>, ) -> Vec { let fallback_agents = tasks .iter() .map(|params| params.agent.clone()) .collect::>(); let executor = Arc::clone(self); + let parent_session_id = parent_session_id.map(|s| s.to_string()); let results = crate::ordered_parallel::run_ordered_parallel_with_limit( tasks, self.max_parallel_tasks, move |_idx, params| { let executor = Arc::clone(&executor); let tx = event_tx.clone(); + let parent = parent_session_id.clone(); async move { - match executor.execute(params.clone(), tx).await { + match executor + .execute(params.clone(), tx, parent.as_deref()) + .await + { Ok(result) => result, Err(e) => TaskResult { output: format!("Task failed: {}", e), @@ -477,8 +531,11 @@ impl Tool for TaskTool { serde_json::from_value(args.clone()).context("Invalid task parameters")?; if params.background { - let task_id = - Arc::clone(&self.executor).execute_background(params, ctx.agent_event_tx.clone()); + let task_id = Arc::clone(&self.executor).execute_background( + params, + ctx.agent_event_tx.clone(), + ctx.session_id.clone(), + ); return Ok(ToolOutput::success(format!( "Task started in background. Task ID: {}", task_id @@ -487,7 +544,11 @@ impl Tool for TaskTool { let result = self .executor - .execute(params, ctx.agent_event_tx.clone()) + .execute( + params, + ctx.agent_event_tx.clone(), + ctx.session_id.as_deref(), + ) .await?; let (content, truncated) = format_task_result_for_context(&result); let metadata = serde_json::json!({ @@ -608,7 +669,11 @@ impl Tool for ParallelTaskTool { let results = self .executor - .execute_parallel(params.tasks, ctx.agent_event_tx.clone()) + .execute_parallel( + params.tasks, + ctx.agent_event_tx.clone(), + ctx.session_id.as_deref(), + ) .await; // Format results with compact per-task excerpts for parent context. @@ -1576,6 +1641,7 @@ mod tests { max_steps: Some(3), }, None, + None, ) .await .unwrap(); @@ -1628,6 +1694,7 @@ mod tests { max_steps: Some(3), }, None, + None, ) .await .unwrap(); @@ -1676,6 +1743,7 @@ mod tests { max_steps: Some(3), }, None, + None, ) .await .unwrap(); @@ -1730,6 +1798,7 @@ mod tests { max_steps: Some(2), }, None, + None, ) .await .unwrap(); @@ -1776,7 +1845,7 @@ mod tests { let results = tokio::time::timeout( Duration::from_secs(2), - executor.execute_parallel(tasks, None), + executor.execute_parallel(tasks, None, None), ) .await .expect("parallel children should reach the barrier and complete"); @@ -1816,7 +1885,7 @@ mod tests { }) .collect::>(); - let results = executor.execute_parallel(tasks, None).await; + let results = executor.execute_parallel(tasks, None, None).await; assert_eq!(results.len(), 5); assert!(results.iter().all(|result| result.success)); @@ -1849,7 +1918,7 @@ mod tests { }, ]; - let results = executor.execute_parallel(tasks, None).await; + let results = executor.execute_parallel(tasks, None, None).await; assert_eq!(results.len(), 2); assert!(!results[0].success); @@ -1887,7 +1956,7 @@ mod tests { }, ]; - let results = executor.execute_parallel(tasks, Some(tx)).await; + let results = executor.execute_parallel(tasks, Some(tx), None).await; assert_eq!(results.len(), 2); tokio::time::sleep(Duration::from_millis(20)).await; @@ -2002,7 +2071,7 @@ mod tests { }, ]; - let results = executor.execute_parallel(tasks, None).await; + let results = executor.execute_parallel(tasks, None, None).await; assert_eq!(results.len(), 2); for result in &results { diff --git a/core/tests/test_task_permission_inheritance.rs b/core/tests/test_task_permission_inheritance.rs index 5b1641e..b9bb752 100644 --- a/core/tests/test_task_permission_inheritance.rs +++ b/core/tests/test_task_permission_inheritance.rs @@ -145,7 +145,7 @@ async fn test_task_executor_child_run_inherits_permissions() { }; let result = executor - .execute(params, None) + .execute(params, None, None) .await .expect("TaskExecutor::execute should not return Err"); @@ -226,7 +226,7 @@ async fn test_parallel_task_executor_inherits_permissions() { }, ]; - let results = executor.execute_parallel(tasks, None).await; + let results = executor.execute_parallel(tasks, None, None).await; assert_eq!(results.len(), 2, "should have 2 results");