Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 67 additions & 2 deletions core/src/orchestration/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use tokio::sync::broadcast;
/// Serializable on purpose: a host (书安OS) may ship it to another node, and
/// a future workflow checkpoint persists it. The orchestration layer assigns
/// `task_id`; everything else mirrors a delegated task.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
// `serde_json::Value` (in `output_schema`) is not `Eq`, so this derives
// `PartialEq` only.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AgentStepSpec {
/// Stable id for this step. Flows into lifecycle events (and, later,
/// workflow checkpoints) so a step can be correlated and resumed.
Expand All @@ -30,6 +32,12 @@ pub struct AgentStepSpec {
/// Parent session id, for lifecycle-event correlation.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_session_id: Option<String>,
/// When set, the step must return a value conforming to this JSON Schema.
/// The executor fulfills it (the local default coerces the step's output
/// with the structured-output machinery); the validated object lands in
/// [`StepOutcome::structured`].
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output_schema: Option<serde_json::Value>,
}

impl AgentStepSpec {
Expand All @@ -47,6 +55,7 @@ impl AgentStepSpec {
prompt: prompt.into(),
max_steps: None,
parent_session_id: None,
output_schema: None,
}
}

Expand All @@ -59,16 +68,29 @@ impl AgentStepSpec {
self.parent_session_id = Some(parent_session_id.into());
self
}

/// Require this step to return a value conforming to `schema`.
pub fn with_output_schema(mut self, schema: serde_json::Value) -> Self {
self.output_schema = Some(schema);
self
}
}

/// The result of running one [`AgentStepSpec`] to completion.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
///
/// `structured` is `Some` only when the spec carried an `output_schema` and
/// the executor produced a value validated against it. (`serde_json::Value`
/// is not `Eq`, so this derives `PartialEq` only.)
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct StepOutcome {
pub task_id: String,
pub session_id: String,
pub agent: String,
pub output: String,
pub success: bool,
/// Schema-validated structured output, when the step requested one.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub structured: Option<serde_json::Value>,
}

impl StepOutcome {
Expand All @@ -88,6 +110,7 @@ impl StepOutcome {
agent: agent.into(),
output: message.into(),
success: false,
structured: None,
}
}
}
Expand Down Expand Up @@ -212,6 +235,7 @@ mod tests {
agent: spec.agent.clone(),
output: format!("ran: {}", spec.prompt),
success: spec.agent != "fail",
structured: None,
}
}
fn concurrency_hint(&self) -> usize {
Expand Down Expand Up @@ -289,4 +313,45 @@ mod tests {
}
assert_eq!(Bare.concurrency_hint(), DEFAULT_MAX_PARALLEL_TASKS);
}

#[test]
fn spec_and_outcome_round_trip_including_new_optional_fields() {
let schema = serde_json::json!({
"type": "object",
"properties": { "v": { "type": "string" } },
"required": ["v"]
});
let spec = AgentStepSpec::new("t1", "explore", "d", "p")
.with_max_steps(3)
.with_parent_session_id("parent")
.with_output_schema(schema.clone());
let back: AgentStepSpec =
serde_json::from_str(&serde_json::to_string(&spec).unwrap()).unwrap();
assert_eq!(back, spec);
assert_eq!(back.output_schema, Some(schema));

let outcome = StepOutcome {
task_id: "t1".into(),
session_id: "task-run-t1".into(),
agent: "explore".into(),
output: "ok".into(),
success: true,
structured: Some(serde_json::json!({ "v": "x" })),
};
let back: StepOutcome =
serde_json::from_str(&serde_json::to_string(&outcome).unwrap()).unwrap();
assert_eq!(back, outcome);

// Backward-compat: a pre-Phase-2 payload lacking the new optional
// fields still loads (they default to None).
let old_spec: AgentStepSpec =
serde_json::from_str(r#"{"task_id":"t","agent":"a","description":"d","prompt":"p"}"#)
.unwrap();
assert_eq!(old_spec.output_schema, None);
let old_outcome: StepOutcome = serde_json::from_str(
r#"{"task_id":"t","session_id":"s","agent":"a","output":"o","success":true}"#,
)
.unwrap();
assert_eq!(old_outcome.structured, None);
}
}
146 changes: 144 additions & 2 deletions core/src/tools/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! ```

use crate::agent::{AgentConfig, AgentEvent, AgentLoop};
use crate::llm::structured::{generate_blocking, StructuredMode, StructuredRequest};
use crate::llm::LlmClient;
use crate::mcp::manager::McpManager;
use crate::orchestration::{AgentExecutor, AgentStepSpec, StepOutcome};
Expand Down Expand Up @@ -517,6 +518,7 @@ impl TaskExecutor {
prompt: params.prompt,
max_steps: params.max_steps,
parent_session_id: parent.clone(),
output_schema: None,
})
.collect();

Expand All @@ -537,6 +539,7 @@ impl From<TaskResult> for StepOutcome {
agent: r.agent,
output: r.output,
success: r.success,
structured: None,
}
}
}
Expand Down Expand Up @@ -565,14 +568,15 @@ impl AgentExecutor for TaskExecutor {
) -> StepOutcome {
let agent = spec.agent.clone();
let task_id = spec.task_id.clone();
let output_schema = spec.output_schema.clone();
let params = TaskParams {
agent: spec.agent,
description: spec.description,
prompt: spec.prompt,
background: false,
max_steps: spec.max_steps,
};
match self
let mut outcome: StepOutcome = match self
.execute_with_task_id(
task_id.clone(),
params,
Expand All @@ -583,15 +587,65 @@ impl AgentExecutor for TaskExecutor {
.await
{
Ok(result) => result.into(),
Err(e) => StepOutcome::failed(task_id, agent, format!("Task failed: {e}")),
Err(e) => return StepOutcome::failed(task_id, agent, format!("Task failed: {e}")),
};

// When the step requested structured output, coerce the (succeeded)
// free-text result to the schema. A coercion failure demotes the step
// to unsuccessful so callers never treat unvalidated text as the
// promised object.
if outcome.success {
if let Some(schema) = output_schema {
match self.coerce_to_schema(&outcome.output, schema).await {
Ok(object) => outcome.structured = Some(object),
Err(e) => {
outcome.success = false;
outcome.output =
format!("{}\n\n[structured output failed: {e}]", outcome.output);
}
}
}
}
outcome
}

fn concurrency_hint(&self) -> usize {
self.max_parallel_tasks
}
}

impl TaskExecutor {
/// Coerce a step's free-text output into a JSON object validated against
/// `schema`, reusing the structured-output machinery (Tool mode — the most
/// portable across providers, with built-in repair). This is one extra LLM
/// call beyond the step's own run.
async fn coerce_to_schema(
&self,
output: &str,
schema: serde_json::Value,
) -> Result<serde_json::Value> {
let req = StructuredRequest {
prompt: format!(
"Convert the following task result into a single JSON object that conforms to \
the required schema. Use only information present in the result.\n\n\
--- TASK RESULT ---\n{output}"
),
system: Some(
"You output exactly one JSON object matching the provided schema.".to_string(),
),
schema,
schema_name: "step_output".to_string(),
schema_description: None,
// Tool mode works on every provider that supports tool use and
// does not depend on response_format wiring.
mode: StructuredMode::Tool,
max_repair_attempts: 2,
};
let result = generate_blocking(&*self.llm_client, &req).await?;
Ok(result.object)
}
}

/// Get the JSON schema for TaskParams
pub fn task_params_schema() -> serde_json::Value {
serde_json::json!({
Expand Down Expand Up @@ -1570,6 +1624,94 @@ mod tests {
.unwrap_or_default()
}

/// Client for the schema-coercion tests. The agent's own turn returns
/// plain text (which ends the loop); the structured-output coercion call
/// — recognizable by the injected `step_output` tool — returns a tool call
/// carrying the object.
struct SchemaCoercionClient;

#[async_trait::async_trait]
impl LlmClient for SchemaCoercionClient {
async fn complete(
&self,
messages: &[Message],
system: Option<&str>,
tools: &[ToolDefinition],
) -> Result<LlmResponse> {
if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
return Ok(pre_analysis_response(messages));
}
// The structured-output coercion injects a synthetic tool named
// `emit_<schema_name>` (here `emit_step_output`).
if tools.iter().any(|t| t.name == "emit_step_output") {
return Ok(MockLlmClient::tool_call_response(
"coerce-1",
"emit_step_output",
serde_json::json!({ "verdict": "ok" }),
));
}
Ok(text_response("The verdict is ok."))
}

async fn complete_streaming(
&self,
_messages: &[Message],
_system: Option<&str>,
_tools: &[ToolDefinition],
_cancel_token: tokio_util::sync::CancellationToken,
) -> Result<mpsc::Receiver<StreamEvent>> {
anyhow::bail!("streaming is not used by schema coercion tests")
}
}

fn verdict_schema() -> serde_json::Value {
serde_json::json!({
"type": "object",
"properties": { "verdict": { "type": "string" } },
"required": ["verdict"]
})
}

#[tokio::test]
async fn execute_step_with_schema_coerces_structured_output() {
let workspace = tempfile::tempdir().unwrap();
let executor = TaskExecutor::new(
Arc::new(AgentRegistry::new()),
Arc::new(SchemaCoercionClient),
workspace.path().to_string_lossy().to_string(),
);
let spec = AgentStepSpec::new("step-1", "general", "assess", "Assess the thing.")
.with_output_schema(verdict_schema());

let outcome = executor.execute_step(spec, None).await;

assert!(outcome.success, "step should succeed: {}", outcome.output);
assert_eq!(
outcome.structured,
Some(serde_json::json!({ "verdict": "ok" })),
"a schema'd step returns the validated object in `structured`"
);
}

#[tokio::test]
async fn execute_step_without_schema_has_no_structured_output() {
let workspace = tempfile::tempdir().unwrap();
let executor = TaskExecutor::new(
Arc::new(AgentRegistry::new()),
Arc::new(SchemaCoercionClient),
workspace.path().to_string_lossy().to_string(),
);
let spec = AgentStepSpec::new("step-2", "general", "assess", "Assess the thing.");

let outcome = executor.execute_step(spec, None).await;

assert!(outcome.success, "step should succeed: {}", outcome.output);
assert_eq!(
outcome.structured, None,
"no schema requested → no structured output, no coercion call"
);
}

struct StaticLlmClient {
text: String,
}
Expand Down
Loading
Loading