From 1a2fce40f44afaf0dfa588d926dfe929a911968f Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 17:22:30 +0800 Subject: [PATCH] docs(orchestration): grammar README section + SDK examples (Phase 5e) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Documents the programmable orchestration grammar (parallel / pipeline / resumable) for both SDKs with side-by-side Python/Node snippets, including the step-spec shape, schema'd steps, the no-inter-stage-barrier pipeline semantics, and the pipeline-stage no-throw constraint. Adds runnable examples: - sdk/python/examples/orchestration_workflow.py - sdk/node/examples/orchestration/parallel-pipeline.mjs Both load .a3s/config.acl at runtime and demonstrate parallel + pipeline; syntax-checked (py_compile / node --check). Running them requires the built native addon (maturin / napi) — they double as the SDK-level validation reference for the callback bridges. Completes Workflow Phase 5 (full grammar, both SDKs). --- README.md | 63 +++++++++++++ .../orchestration/parallel-pipeline.mjs | 56 +++++++++++ sdk/python/examples/orchestration_workflow.py | 94 +++++++++++++++++++ 3 files changed, 213 insertions(+) create mode 100644 sdk/node/examples/orchestration/parallel-pipeline.mjs create mode 100644 sdk/python/examples/orchestration_workflow.py diff --git a/README.md b/README.md index 6bca3d9..b506e35 100644 --- a/README.md +++ b/README.md @@ -571,6 +571,69 @@ session2.setBudgetGuard({ --- +## Programmable Orchestration + +Beyond *model-driven* delegation (the agent calling `task`/`parallel_task`), a +session exposes a **deterministic, programmable** orchestration grammar: you +decide the fan-out, chaining, and resume in code. Steps run through the +session's `AgentExecutor`; a host (书安OS) can substitute its own executor to +place steps across a cluster — the grammar is identical either way. + +A **step** is `{ task_id, agent, description, prompt, max_steps?, +parent_session_id?, output_schema? }`. With `output_schema`, the step returns a +schema-validated object in `structured`. + +```python +# Fan out N steps; results come back in input order. A failed step is +# success=False, not a thrown error. +outcomes = session.parallel([ + {"task_id": "a", "agent": "explore", "description": "find auth", "prompt": "Where is auth handled?"}, + {"task_id": "b", "agent": "review", "description": "review", "prompt": "Review src/auth.rs", + "output_schema": {"type": "object", "properties": {"verdict": {"type": "string"}}, "required": ["verdict"]}}, +]) +print(outcomes[1]["structured"]) # {"verdict": "..."} + +# Pipeline: each item flows through stages with NO barrier between them — +# item A can be in stage 2 while item B is still in stage 1. A stage returns +# the next step (deriving from the previous outcome) or None to stop. +results = session.pipeline( + ["src/auth.rs", "src/db.rs"], + [ + lambda ctx: {"task_id": "rev", "agent": "review", "description": "review", "prompt": f"Review {ctx['item']}"}, + lambda ctx: {"task_id": "fix", "agent": "general", "description": "verify", + "prompt": f"Verify this review: {ctx['previous']['output']}"}, + ], +) + +# Resumable: progress is journaled under a workflow id via the session store, +# so an interrupted run (or one resumed on another node) skips completed steps. +outcomes = session.parallel_resumable(specs, "nightly-audit") +``` + +```javascript +// Node — identical shapes, camelCase. Promises resolve to the same outcomes. +const outcomes = await session.parallel([ + { taskId: "a", agent: "explore", description: "find auth", prompt: "Where is auth handled?" }, + { taskId: "b", agent: "review", description: "review", prompt: "Review src/auth.rs" }, +]); + +const results = await session.pipeline( + ["src/auth.rs", "src/db.rs"], + [ + (ctx) => ({ taskId: "rev", agent: "review", description: "review", prompt: `Review ${ctx.item}` }), + (ctx) => ({ taskId: "fix", agent: "general", description: "verify", + prompt: `Verify this review: ${ctx.previous.output}` }), + ], +); + +await session.parallelResumable(specs, "nightly-audit"); +// NOTE: pipeline stage callbacks MUST NOT throw — return null to stop a chain. +// A throw aborts the process (same constraint as setBudgetGuard). A stage that +// hangs past timeoutMs (default 30s) fails closed (treated as null). +``` + +--- + ## Design Principles ### 1. Small Kernel diff --git a/sdk/node/examples/orchestration/parallel-pipeline.mjs b/sdk/node/examples/orchestration/parallel-pipeline.mjs new file mode 100644 index 0000000..8f3fe4f --- /dev/null +++ b/sdk/node/examples/orchestration/parallel-pipeline.mjs @@ -0,0 +1,56 @@ +/** + * Programmable orchestration via the A3S Code Node SDK. + * + * session.parallel(specs) — fan out steps, results in input order + * session.pipeline(items, stages) — per-item chains, no inter-stage barrier + * session.parallelResumable(specs, id) — journaled, resumable fan-out + * + * Config is read from .a3s/config.acl at runtime (never hardcoded). + * Run: node sdk/node/examples/orchestration/parallel-pipeline.mjs + */ +import { createRequire } from 'node:module'; +import { existsSync } from 'node:fs'; +import { dirname, join } from 'node:path'; +import { fileURLToPath } from 'node:url'; + +const require = createRequire(import.meta.url); +const { Agent } = require('@a3s-lab/code'); + +function repoConfigPath() { + let dir = dirname(fileURLToPath(import.meta.url)); + for (let i = 0; i < 10; i++) { + const candidate = join(dir, '.a3s', 'config.acl'); + if (existsSync(candidate)) return candidate; + dir = dirname(dir); + } + throw new Error('could not locate .a3s/config.acl above this example'); +} + +async function main() { + const agent = await Agent.create(repoConfigPath()); + const session = agent.session('.', {}); + + // 1. parallel — independent steps; outcomes in input order. + const outcomes = await session.parallel([ + { taskId: 'langs', agent: 'general', description: 'list', prompt: 'Name three systems languages.', maxSteps: 2 }, + { taskId: 'safe', agent: 'general', description: 'classify', prompt: 'Is Rust memory-safe without a GC? yes/no.', maxSteps: 2 }, + ]); + for (const o of outcomes) console.log(`[parallel] ${o.taskId}: success=${o.success}`); + + // 2. pipeline — stage 2 builds on stage 1's output. Return null to stop a + // chain. A stage callback MUST NOT throw (return null on error). + const results = await session.pipeline( + ['the Rust programming language'], + [ + (ctx) => ({ taskId: 'sum', agent: 'general', description: 'summarize', prompt: `In one sentence, what is ${ctx.item}?`, maxSteps: 2 }), + (ctx) => ({ taskId: 'cls', agent: 'general', description: 'classify', + prompt: `Reply YES or NO: does this describe a programming language?\n\n${ctx.previous.output}`, maxSteps: 2 }), + ], + ); + for (const r of results) console.log(`[pipeline] final=${r === null ? null : JSON.stringify(r.output.slice(0, 60))}`); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/sdk/python/examples/orchestration_workflow.py b/sdk/python/examples/orchestration_workflow.py new file mode 100644 index 0000000..a020d06 --- /dev/null +++ b/sdk/python/examples/orchestration_workflow.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +""" +Programmable orchestration via the A3S Code Python SDK. + +Demonstrates the deterministic grammar exposed on a session: + - session.parallel(specs) — fan out steps, results in input order + - session.pipeline(items, stages) — per-item chains, no inter-stage barrier + - session.parallel_resumable(...) — journaled, resumable fan-out + +Config is loaded from .a3s/config.acl at runtime (never hardcoded). + +Run: cd crates/code/sdk/python && python examples/orchestration_workflow.py +""" + +import sys +from pathlib import Path + +from a3s_code import Agent, SessionOptions + + +def repo_config_path() -> Path: + """Find .a3s/config.acl walking up from this file.""" + here = Path(__file__).resolve() + for parent in here.parents: + candidate = parent / ".a3s" / "config.acl" + if candidate.exists(): + return candidate + raise SystemExit("could not locate .a3s/config.acl above this example") + + +def main() -> int: + config = str(repo_config_path()) + agent = Agent.create(config) + session = agent.session(".", SessionOptions()) + + # 1. parallel — fan out independent steps; outcomes come back in order. + outcomes = session.parallel( + [ + { + "task_id": "langs", + "agent": "general", + "description": "list languages", + "prompt": "Name three systems programming languages, comma-separated.", + "max_steps": 2, + }, + { + "task_id": "verdict", + "agent": "general", + "description": "classify", + "prompt": "Is Rust memory-safe without a GC? Answer yes or no.", + "max_steps": 2, + # Schema-validated structured output for this step. + "output_schema": { + "type": "object", + "properties": {"memory_safe": {"type": "boolean"}}, + "required": ["memory_safe"], + }, + }, + ] + ) + for o in outcomes: + print(f"[parallel] {o['task_id']}: success={o['success']} structured={o.get('structured')}") + + # 2. pipeline — each item chains through stages; stage 2 builds on stage 1. + # Return None from a stage (or raise — caught and treated as None) to + # stop that item's chain. + results = session.pipeline( + ["the Rust programming language"], + [ + lambda ctx: { + "task_id": "summarize", + "agent": "general", + "description": "summarize", + "prompt": f"In one sentence, what is {ctx['item']}?", + "max_steps": 2, + }, + lambda ctx: { + "task_id": "classify", + "agent": "general", + "description": "classify", + "prompt": "Reply with one word YES or NO: does this describe a " + f"programming language?\n\n{ctx['previous']['output']}", + "max_steps": 2, + }, + ], + ) + for r in results: + print(f"[pipeline] final={None if r is None else r['output'][:60]!r}") + + return 0 + + +if __name__ == "__main__": + sys.exit(main())