Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions core/src/agent_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ pub struct AgentSession {
current_run_id: Arc<tokio::sync::Mutex<Option<String>>>,
/// In-memory run snapshots and event replay buffer for this session.
run_store: Arc<crate::run::InMemoryRunStore>,
/// Materialized view of delegated subagent task lifecycle, populated from runtime events.
subagent_tasks: Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>,
/// Currently executing tools observed from runtime events.
active_tools: Arc<tokio::sync::RwLock<HashMap<String, ActiveToolState>>>,
/// Compact execution traces for this session.
Expand Down Expand Up @@ -564,6 +566,34 @@ impl AgentSession {
SessionView::from_session(self).active_tools().await
}

/// Look up a delegated subagent task by id. Returns `None` if no such task
/// has been observed in this session.
pub async fn subagent_task(
&self,
task_id: &str,
) -> Option<crate::subagent_task_tracker::SubagentTaskSnapshot> {
self.subagent_tasks.get(task_id).await
}

/// Return snapshots of every delegated subagent task observed in this
/// session (including completed and failed ones), oldest first.
pub async fn subagent_tasks(&self) -> Vec<crate::subagent_task_tracker::SubagentTaskSnapshot> {
self.subagent_tasks.list_for_parent(&self.session_id).await
}

/// Return snapshots of subagent tasks still in `Running` state.
pub async fn pending_subagent_tasks(
&self,
) -> Vec<crate::subagent_task_tracker::SubagentTaskSnapshot> {
use crate::subagent_task_tracker::SubagentStatus;
self.subagent_tasks
.list_for_parent(&self.session_id)
.await
.into_iter()
.filter(|task| task.status == SubagentStatus::Running)
.collect()
}

/// Return a snapshot of the session's conversation history.
pub fn history(&self) -> Vec<Message> {
SessionView::from_session(self).history()
Expand Down
9 changes: 8 additions & 1 deletion core/src/agent_api/runtime_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub(super) struct RuntimeEventSink {
session_id: String,
hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
active_tools: ActiveToolMap,
subagent_tasks: Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>,
}

impl RuntimeEventSink {
Expand All @@ -57,6 +58,7 @@ impl RuntimeEventSink {
session.session_id.clone(),
session.ahp_executor.clone(),
Arc::clone(&session.active_tools),
Arc::clone(&session.subagent_tasks),
)
}

Expand All @@ -66,13 +68,15 @@ impl RuntimeEventSink {
session_id: String,
hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
active_tools: ActiveToolMap,
subagent_tasks: Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>,
) -> Self {
Self {
run_store,
run_id,
session_id,
hook_executor,
active_tools,
subagent_tasks,
}
}

Expand Down Expand Up @@ -106,7 +110,7 @@ impl RuntimeEventSink {
})
}

async fn observe(&self, event: &AgentEvent) {
pub(super) async fn observe(&self, event: &AgentEvent) {
let _ = self
.run_store
.record_event(&self.run_id, event.clone())
Expand All @@ -116,6 +120,7 @@ impl RuntimeEventSink {
.record_agent_event(event, &self.run_id, &self.session_id)
.await;
}
self.subagent_tasks.record_event(event).await;
self.apply(event).await;
}

Expand Down Expand Up @@ -200,6 +205,7 @@ mod tests {
"session-1".to_string(),
None,
Arc::clone(&active_tools),
Arc::new(crate::subagent_task_tracker::InMemorySubagentTaskTracker::new()),
);

sink.observe(&AgentEvent::ToolStart {
Expand Down Expand Up @@ -239,6 +245,7 @@ mod tests {
"session-1".to_string(),
None,
active_tools(),
Arc::new(crate::subagent_task_tracker::InMemorySubagentTaskTracker::new()),
);

sink.observe(&AgentEvent::TextDelta {
Expand Down
1 change: 1 addition & 0 deletions core/src/agent_api/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ pub(super) fn build_agent_session(
cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
current_run_id: Arc::new(tokio::sync::Mutex::new(None)),
run_store: Arc::new(crate::run::InMemoryRunStore::new()),
subagent_tasks: Arc::new(crate::subagent_task_tracker::InMemorySubagentTaskTracker::new()),
active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
trace_sink,
verification_reports: Arc::new(RwLock::new(Vec::new())),
Expand Down
93 changes: 93 additions & 0 deletions core/src/agent_api/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2324,3 +2324,96 @@ fn test_session_command_is_available_from_queue_module() {
use crate::queue::SessionCommand;
let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
}

#[tokio::test]
async fn subagent_events_populate_session_tracker() {
use super::runtime_events::RuntimeEventSink;
use crate::agent::AgentEvent;
use crate::subagent_task_tracker::SubagentStatus;

let agent = Agent::from_config(test_config()).await.unwrap();
let session = agent
.session("/tmp/test-ws-subagent-tracker", None)
.unwrap();

// Drive a synthetic subagent lifecycle through the session's runtime sink.
let run = session
.run_store
.create_run(session.session_id(), "parent prompt")
.await;
let sink = RuntimeEventSink::from_session(&session, &run.id);

let task_id = "task-test-1".to_string();
let child_session_id = format!("task-run-{}", task_id);

sink.observe(&AgentEvent::SubagentStart {
task_id: task_id.clone(),
session_id: child_session_id.clone(),
parent_session_id: session.session_id().to_string(),
agent: "explore".to_string(),
description: "demo delegation".to_string(),
})
.await;

let snap = session
.subagent_task(&task_id)
.await
.expect("running task should be visible");
assert_eq!(snap.status, SubagentStatus::Running);
assert_eq!(snap.parent_session_id, session.session_id());
assert_eq!(snap.child_session_id, child_session_id);
assert_eq!(snap.agent, "explore");
assert!(snap.finished_ms.is_none());

let pending = session.pending_subagent_tasks().await;
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].task_id, task_id);

sink.observe(&AgentEvent::SubagentEnd {
task_id: task_id.clone(),
session_id: child_session_id,
agent: "explore".to_string(),
output: "found things".to_string(),
success: true,
})
.await;

let snap = session.subagent_task(&task_id).await.unwrap();
assert_eq!(snap.status, SubagentStatus::Completed);
assert_eq!(snap.success, Some(true));
assert_eq!(snap.output.as_deref(), Some("found things"));
assert!(snap.finished_ms.is_some());

assert!(session.pending_subagent_tasks().await.is_empty());
assert_eq!(session.subagent_tasks().await.len(), 1);
}

#[tokio::test]
async fn subagent_tasks_scope_to_parent_session() {
use super::runtime_events::RuntimeEventSink;
use crate::agent::AgentEvent;

let agent = Agent::from_config(test_config()).await.unwrap();
let session_a = agent.session("/tmp/test-ws-subagent-a", None).unwrap();
let session_b = agent.session("/tmp/test-ws-subagent-b", None).unwrap();

let run = session_a
.run_store
.create_run(session_a.session_id(), "p")
.await;
let sink = RuntimeEventSink::from_session(&session_a, &run.id);

sink.observe(&AgentEvent::SubagentStart {
task_id: "task-from-a".to_string(),
session_id: "task-run-task-from-a".to_string(),
parent_session_id: session_a.session_id().to_string(),
agent: "explore".to_string(),
description: "isolated".to_string(),
})
.await;

// session A sees the task; session B has its own (empty) tracker.
assert_eq!(session_a.subagent_tasks().await.len(), 1);
assert!(session_b.subagent_tasks().await.is_empty());
assert!(session_b.subagent_task("task-from-a").await.is_none());
}
4 changes: 4 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub(crate) mod session_lane_queue;
pub mod skills;
pub mod store;
pub mod subagent;
pub mod subagent_task_tracker;
pub mod telemetry;
#[cfg(feature = "telemetry")]
pub mod telemetry_otel;
Expand Down Expand Up @@ -138,6 +139,9 @@ pub use subagent::{
AgentDefinition, AgentRegistry, CattleAgentKind, CattleAgentSpec, ConfirmationInheritance,
WorkerAgentKind, WorkerAgentSpec,
};
pub use subagent_task_tracker::{
InMemorySubagentTaskTracker, SubagentProgressEntry, SubagentStatus, SubagentTaskSnapshot,
};
pub use tools::ToolErrorKind;
pub use workspace::{
CommandOutput, CommandOutputObserver, CommandRequest, LocalWorkspaceBackend, RemoteGitBackend,
Expand Down
Loading
Loading