From 279b495fd71cfeefa779b89f010e83cdcb3a7451 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 16:17:54 +0800 Subject: [PATCH] feat(orchestration): schema-forced step output (Phase 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A step can now require schema-validated structured output, mirroring Claude Code's agent(prompt, {schema}). - AgentStepSpec gains `output_schema: Option` (+ with_output_schema); StepOutcome gains `structured: Option`. Both optional + serde-default, so pre-Phase-2 payloads still load (Value isn't Eq → derives PartialEq only). - The local executor (TaskExecutor::execute_step) runs the step normally, then coerces its output to the schema via the existing structured-output machinery (generate_blocking, Tool mode — portable, with built-in repair). A coercion failure demotes the step to unsuccessful so callers never treat unvalidated text as the promised object. The executor owns schema fulfilment, so a remote host can satisfy it however it likes (the requirement travels in the spec). Tests: mock-based coercion (schema → validated object; no-schema → none), spec/outcome round-trip incl. the new fields + backward-compat, and a real-LLM \#[ignore] test (live model returns a schema-valid object via execute_step) passing against .a3s/config.acl. Task suite + orchestration tests green; fmt + clippy --lib --bins clean. --- core/src/orchestration/executor.rs | 69 +++++++++- core/src/tools/task.rs | 146 +++++++++++++++++++++- core/tests/test_orchestration_real_llm.rs | 93 ++++++++++++++ 3 files changed, 304 insertions(+), 4 deletions(-) create mode 100644 core/tests/test_orchestration_real_llm.rs diff --git a/core/src/orchestration/executor.rs b/core/src/orchestration/executor.rs index a2cde09..9db07c1 100644 --- a/core/src/orchestration/executor.rs +++ b/core/src/orchestration/executor.rs @@ -13,7 +13,9 @@ use tokio::sync::broadcast; /// Serializable on purpose: a host (书安OS) may ship it to another node, and /// a future workflow checkpoint persists it. The orchestration layer assigns /// `task_id`; everything else mirrors a delegated task. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +// `serde_json::Value` (in `output_schema`) is not `Eq`, so this derives +// `PartialEq` only. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct AgentStepSpec { /// Stable id for this step. Flows into lifecycle events (and, later, /// workflow checkpoints) so a step can be correlated and resumed. @@ -30,6 +32,12 @@ pub struct AgentStepSpec { /// Parent session id, for lifecycle-event correlation. #[serde(default, skip_serializing_if = "Option::is_none")] pub parent_session_id: Option, + /// When set, the step must return a value conforming to this JSON Schema. + /// The executor fulfills it (the local default coerces the step's output + /// with the structured-output machinery); the validated object lands in + /// [`StepOutcome::structured`]. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub output_schema: Option, } impl AgentStepSpec { @@ -47,6 +55,7 @@ impl AgentStepSpec { prompt: prompt.into(), max_steps: None, parent_session_id: None, + output_schema: None, } } @@ -59,16 +68,29 @@ impl AgentStepSpec { self.parent_session_id = Some(parent_session_id.into()); self } + + /// Require this step to return a value conforming to `schema`. + pub fn with_output_schema(mut self, schema: serde_json::Value) -> Self { + self.output_schema = Some(schema); + self + } } /// The result of running one [`AgentStepSpec`] to completion. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// +/// `structured` is `Some` only when the spec carried an `output_schema` and +/// the executor produced a value validated against it. (`serde_json::Value` +/// is not `Eq`, so this derives `PartialEq` only.) +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct StepOutcome { pub task_id: String, pub session_id: String, pub agent: String, pub output: String, pub success: bool, + /// Schema-validated structured output, when the step requested one. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub structured: Option, } impl StepOutcome { @@ -88,6 +110,7 @@ impl StepOutcome { agent: agent.into(), output: message.into(), success: false, + structured: None, } } } @@ -212,6 +235,7 @@ mod tests { agent: spec.agent.clone(), output: format!("ran: {}", spec.prompt), success: spec.agent != "fail", + structured: None, } } fn concurrency_hint(&self) -> usize { @@ -289,4 +313,45 @@ mod tests { } assert_eq!(Bare.concurrency_hint(), DEFAULT_MAX_PARALLEL_TASKS); } + + #[test] + fn spec_and_outcome_round_trip_including_new_optional_fields() { + let schema = serde_json::json!({ + "type": "object", + "properties": { "v": { "type": "string" } }, + "required": ["v"] + }); + let spec = AgentStepSpec::new("t1", "explore", "d", "p") + .with_max_steps(3) + .with_parent_session_id("parent") + .with_output_schema(schema.clone()); + let back: AgentStepSpec = + serde_json::from_str(&serde_json::to_string(&spec).unwrap()).unwrap(); + assert_eq!(back, spec); + assert_eq!(back.output_schema, Some(schema)); + + let outcome = StepOutcome { + task_id: "t1".into(), + session_id: "task-run-t1".into(), + agent: "explore".into(), + output: "ok".into(), + success: true, + structured: Some(serde_json::json!({ "v": "x" })), + }; + let back: StepOutcome = + serde_json::from_str(&serde_json::to_string(&outcome).unwrap()).unwrap(); + assert_eq!(back, outcome); + + // Backward-compat: a pre-Phase-2 payload lacking the new optional + // fields still loads (they default to None). + let old_spec: AgentStepSpec = + serde_json::from_str(r#"{"task_id":"t","agent":"a","description":"d","prompt":"p"}"#) + .unwrap(); + assert_eq!(old_spec.output_schema, None); + let old_outcome: StepOutcome = serde_json::from_str( + r#"{"task_id":"t","session_id":"s","agent":"a","output":"o","success":true}"#, + ) + .unwrap(); + assert_eq!(old_outcome.structured, None); + } } diff --git a/core/src/tools/task.rs b/core/src/tools/task.rs index d713115..b304dc8 100644 --- a/core/src/tools/task.rs +++ b/core/src/tools/task.rs @@ -15,6 +15,7 @@ //! ``` use crate::agent::{AgentConfig, AgentEvent, AgentLoop}; +use crate::llm::structured::{generate_blocking, StructuredMode, StructuredRequest}; use crate::llm::LlmClient; use crate::mcp::manager::McpManager; use crate::orchestration::{AgentExecutor, AgentStepSpec, StepOutcome}; @@ -517,6 +518,7 @@ impl TaskExecutor { prompt: params.prompt, max_steps: params.max_steps, parent_session_id: parent.clone(), + output_schema: None, }) .collect(); @@ -537,6 +539,7 @@ impl From for StepOutcome { agent: r.agent, output: r.output, success: r.success, + structured: None, } } } @@ -565,6 +568,7 @@ impl AgentExecutor for TaskExecutor { ) -> StepOutcome { let agent = spec.agent.clone(); let task_id = spec.task_id.clone(); + let output_schema = spec.output_schema.clone(); let params = TaskParams { agent: spec.agent, description: spec.description, @@ -572,7 +576,7 @@ impl AgentExecutor for TaskExecutor { background: false, max_steps: spec.max_steps, }; - match self + let mut outcome: StepOutcome = match self .execute_with_task_id( task_id.clone(), params, @@ -583,8 +587,26 @@ impl AgentExecutor for TaskExecutor { .await { Ok(result) => result.into(), - Err(e) => StepOutcome::failed(task_id, agent, format!("Task failed: {e}")), + Err(e) => return StepOutcome::failed(task_id, agent, format!("Task failed: {e}")), + }; + + // When the step requested structured output, coerce the (succeeded) + // free-text result to the schema. A coercion failure demotes the step + // to unsuccessful so callers never treat unvalidated text as the + // promised object. + if outcome.success { + if let Some(schema) = output_schema { + match self.coerce_to_schema(&outcome.output, schema).await { + Ok(object) => outcome.structured = Some(object), + Err(e) => { + outcome.success = false; + outcome.output = + format!("{}\n\n[structured output failed: {e}]", outcome.output); + } + } + } } + outcome } fn concurrency_hint(&self) -> usize { @@ -592,6 +614,38 @@ impl AgentExecutor for TaskExecutor { } } +impl TaskExecutor { + /// Coerce a step's free-text output into a JSON object validated against + /// `schema`, reusing the structured-output machinery (Tool mode — the most + /// portable across providers, with built-in repair). This is one extra LLM + /// call beyond the step's own run. + async fn coerce_to_schema( + &self, + output: &str, + schema: serde_json::Value, + ) -> Result { + let req = StructuredRequest { + prompt: format!( + "Convert the following task result into a single JSON object that conforms to \ + the required schema. Use only information present in the result.\n\n\ + --- TASK RESULT ---\n{output}" + ), + system: Some( + "You output exactly one JSON object matching the provided schema.".to_string(), + ), + schema, + schema_name: "step_output".to_string(), + schema_description: None, + // Tool mode works on every provider that supports tool use and + // does not depend on response_format wiring. + mode: StructuredMode::Tool, + max_repair_attempts: 2, + }; + let result = generate_blocking(&*self.llm_client, &req).await?; + Ok(result.object) + } +} + /// Get the JSON schema for TaskParams pub fn task_params_schema() -> serde_json::Value { serde_json::json!({ @@ -1570,6 +1624,94 @@ mod tests { .unwrap_or_default() } + /// Client for the schema-coercion tests. The agent's own turn returns + /// plain text (which ends the loop); the structured-output coercion call + /// — recognizable by the injected `step_output` tool — returns a tool call + /// carrying the object. + struct SchemaCoercionClient; + + #[async_trait::async_trait] + impl LlmClient for SchemaCoercionClient { + async fn complete( + &self, + messages: &[Message], + system: Option<&str>, + tools: &[ToolDefinition], + ) -> Result { + if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) { + return Ok(pre_analysis_response(messages)); + } + // The structured-output coercion injects a synthetic tool named + // `emit_` (here `emit_step_output`). + if tools.iter().any(|t| t.name == "emit_step_output") { + return Ok(MockLlmClient::tool_call_response( + "coerce-1", + "emit_step_output", + serde_json::json!({ "verdict": "ok" }), + )); + } + Ok(text_response("The verdict is ok.")) + } + + async fn complete_streaming( + &self, + _messages: &[Message], + _system: Option<&str>, + _tools: &[ToolDefinition], + _cancel_token: tokio_util::sync::CancellationToken, + ) -> Result> { + anyhow::bail!("streaming is not used by schema coercion tests") + } + } + + fn verdict_schema() -> serde_json::Value { + serde_json::json!({ + "type": "object", + "properties": { "verdict": { "type": "string" } }, + "required": ["verdict"] + }) + } + + #[tokio::test] + async fn execute_step_with_schema_coerces_structured_output() { + let workspace = tempfile::tempdir().unwrap(); + let executor = TaskExecutor::new( + Arc::new(AgentRegistry::new()), + Arc::new(SchemaCoercionClient), + workspace.path().to_string_lossy().to_string(), + ); + let spec = AgentStepSpec::new("step-1", "general", "assess", "Assess the thing.") + .with_output_schema(verdict_schema()); + + let outcome = executor.execute_step(spec, None).await; + + assert!(outcome.success, "step should succeed: {}", outcome.output); + assert_eq!( + outcome.structured, + Some(serde_json::json!({ "verdict": "ok" })), + "a schema'd step returns the validated object in `structured`" + ); + } + + #[tokio::test] + async fn execute_step_without_schema_has_no_structured_output() { + let workspace = tempfile::tempdir().unwrap(); + let executor = TaskExecutor::new( + Arc::new(AgentRegistry::new()), + Arc::new(SchemaCoercionClient), + workspace.path().to_string_lossy().to_string(), + ); + let spec = AgentStepSpec::new("step-2", "general", "assess", "Assess the thing."); + + let outcome = executor.execute_step(spec, None).await; + + assert!(outcome.success, "step should succeed: {}", outcome.output); + assert_eq!( + outcome.structured, None, + "no schema requested → no structured output, no coercion call" + ); + } + struct StaticLlmClient { text: String, } diff --git a/core/tests/test_orchestration_real_llm.rs b/core/tests/test_orchestration_real_llm.rs new file mode 100644 index 0000000..c01a048 --- /dev/null +++ b/core/tests/test_orchestration_real_llm.rs @@ -0,0 +1,93 @@ +//! Real-LLM validation of the orchestration layer (Workflow phases). +//! +//! `#[ignore]` — requires a live provider in `.a3s/config.acl`. Run: +//! +//! ```bash +//! A3S_CONFIG_FILE=/abs/path/.a3s/config.acl \ +//! cargo test -p a3s-code-core --test test_orchestration_real_llm -- --ignored --nocapture +//! ``` + +use std::path::PathBuf; +use std::sync::Arc; + +use a3s_code_core::config::CodeConfig; +use a3s_code_core::llm::create_client_with_config; +use a3s_code_core::orchestration::{AgentExecutor, AgentStepSpec}; +use a3s_code_core::subagent::AgentRegistry; +use a3s_code_core::tools::TaskExecutor; + +fn repo_config_path() -> PathBuf { + std::env::var_os("A3S_CONFIG_FILE") + .map(PathBuf::from) + .unwrap_or_else(|| { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("../../..") + .join(".a3s/config.acl") + }) +} + +/// Returns the executor plus the workspace guard — keep the guard in scope so +/// the temp dir is cleaned up when the test ends (no stray temp files). +fn local_executor() -> (TaskExecutor, tempfile::TempDir) { + let path = repo_config_path(); + let config = CodeConfig::from_file(&path) + .unwrap_or_else(|e| panic!("failed to load {}: {e}", path.display())); + let llm_client = + create_client_with_config(config.default_llm_config().expect("default llm config")); + let workspace = tempfile::tempdir().expect("temp workspace"); + let executor = TaskExecutor::new( + Arc::new(AgentRegistry::new()), + llm_client, + workspace.path().to_string_lossy().to_string(), + ); + (executor, workspace) +} + +/// Phase 2: a step carrying an `output_schema` runs against a live model and +/// returns a value validated against the schema in `StepOutcome::structured`. +/// Mock clients can't validate the structured-output coercion against a real +/// provider's tool-calling behavior — this does. +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_execute_step_with_schema_returns_validated_object() { + let (executor, _workspace) = local_executor(); + + let schema = serde_json::json!({ + "type": "object", + "properties": { + "language": { "type": "string" }, + "is_systems_language": { "type": "boolean" } + }, + "required": ["language", "is_systems_language"] + }); + let spec = AgentStepSpec::new( + "real-schema-1", + "general", + "classify language", + "Briefly describe the Rust programming language and whether it is a systems language.", + ) + .with_output_schema(schema) + .with_max_steps(2); + + let outcome = executor.execute_step(spec, None).await; + + assert!( + outcome.success, + "schema'd step should succeed: {}", + outcome.output + ); + let object = outcome + .structured + .expect("a schema'd step must return a validated structured object"); + assert!( + object.get("language").and_then(|v| v.as_str()).is_some(), + "object has a string `language`: {object}" + ); + assert!( + object + .get("is_systems_language") + .map(|v| v.is_boolean()) + .unwrap_or(false), + "object has a boolean `is_systems_language`: {object}" + ); +}