Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,69 @@ session2.setBudgetGuard({

---

## Programmable Orchestration

Beyond *model-driven* delegation (the agent calling `task`/`parallel_task`), a
session exposes a **deterministic, programmable** orchestration grammar: you
decide the fan-out, chaining, and resume in code. Steps run through the
session's `AgentExecutor`; a host (书安OS) can substitute its own executor to
place steps across a cluster — the grammar is identical either way.

A **step** is `{ task_id, agent, description, prompt, max_steps?,
parent_session_id?, output_schema? }`. With `output_schema`, the step returns a
schema-validated object in `structured`.

```python
# Fan out N steps; results come back in input order. A failed step is
# success=False, not a thrown error.
outcomes = session.parallel([
{"task_id": "a", "agent": "explore", "description": "find auth", "prompt": "Where is auth handled?"},
{"task_id": "b", "agent": "review", "description": "review", "prompt": "Review src/auth.rs",
"output_schema": {"type": "object", "properties": {"verdict": {"type": "string"}}, "required": ["verdict"]}},
])
print(outcomes[1]["structured"]) # {"verdict": "..."}

# Pipeline: each item flows through stages with NO barrier between them —
# item A can be in stage 2 while item B is still in stage 1. A stage returns
# the next step (deriving from the previous outcome) or None to stop.
results = session.pipeline(
["src/auth.rs", "src/db.rs"],
[
lambda ctx: {"task_id": "rev", "agent": "review", "description": "review", "prompt": f"Review {ctx['item']}"},
lambda ctx: {"task_id": "fix", "agent": "general", "description": "verify",
"prompt": f"Verify this review: {ctx['previous']['output']}"},
],
)

# Resumable: progress is journaled under a workflow id via the session store,
# so an interrupted run (or one resumed on another node) skips completed steps.
outcomes = session.parallel_resumable(specs, "nightly-audit")
```

```javascript
// Node — identical shapes, camelCase. Promises resolve to the same outcomes.
const outcomes = await session.parallel([
{ taskId: "a", agent: "explore", description: "find auth", prompt: "Where is auth handled?" },
{ taskId: "b", agent: "review", description: "review", prompt: "Review src/auth.rs" },
]);

const results = await session.pipeline(
["src/auth.rs", "src/db.rs"],
[
(ctx) => ({ taskId: "rev", agent: "review", description: "review", prompt: `Review ${ctx.item}` }),
(ctx) => ({ taskId: "fix", agent: "general", description: "verify",
prompt: `Verify this review: ${ctx.previous.output}` }),
],
);

await session.parallelResumable(specs, "nightly-audit");
// NOTE: pipeline stage callbacks MUST NOT throw — return null to stop a chain.
// A throw aborts the process (same constraint as setBudgetGuard). A stage that
// hangs past timeoutMs (default 30s) fails closed (treated as null).
```

---

## Design Principles

### 1. Small Kernel
Expand Down
56 changes: 56 additions & 0 deletions sdk/node/examples/orchestration/parallel-pipeline.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Programmable orchestration via the A3S Code Node SDK.
*
* session.parallel(specs) — fan out steps, results in input order
* session.pipeline(items, stages) — per-item chains, no inter-stage barrier
* session.parallelResumable(specs, id) — journaled, resumable fan-out
*
* Config is read from .a3s/config.acl at runtime (never hardcoded).
* Run: node sdk/node/examples/orchestration/parallel-pipeline.mjs
*/
import { createRequire } from 'node:module';
import { existsSync } from 'node:fs';
import { dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';

const require = createRequire(import.meta.url);
const { Agent } = require('@a3s-lab/code');

function repoConfigPath() {
let dir = dirname(fileURLToPath(import.meta.url));
for (let i = 0; i < 10; i++) {
const candidate = join(dir, '.a3s', 'config.acl');
if (existsSync(candidate)) return candidate;
dir = dirname(dir);
}
throw new Error('could not locate .a3s/config.acl above this example');
}

async function main() {
const agent = await Agent.create(repoConfigPath());
const session = agent.session('.', {});

// 1. parallel — independent steps; outcomes in input order.
const outcomes = await session.parallel([
{ taskId: 'langs', agent: 'general', description: 'list', prompt: 'Name three systems languages.', maxSteps: 2 },
{ taskId: 'safe', agent: 'general', description: 'classify', prompt: 'Is Rust memory-safe without a GC? yes/no.', maxSteps: 2 },
]);
for (const o of outcomes) console.log(`[parallel] ${o.taskId}: success=${o.success}`);

// 2. pipeline — stage 2 builds on stage 1's output. Return null to stop a
// chain. A stage callback MUST NOT throw (return null on error).
const results = await session.pipeline(
['the Rust programming language'],
[
(ctx) => ({ taskId: 'sum', agent: 'general', description: 'summarize', prompt: `In one sentence, what is ${ctx.item}?`, maxSteps: 2 }),
(ctx) => ({ taskId: 'cls', agent: 'general', description: 'classify',
prompt: `Reply YES or NO: does this describe a programming language?\n\n${ctx.previous.output}`, maxSteps: 2 }),
],
);
for (const r of results) console.log(`[pipeline] final=${r === null ? null : JSON.stringify(r.output.slice(0, 60))}`);
}

main().catch((err) => {
console.error(err);
process.exit(1);
});
94 changes: 94 additions & 0 deletions sdk/python/examples/orchestration_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env python3
"""
Programmable orchestration via the A3S Code Python SDK.

Demonstrates the deterministic grammar exposed on a session:
- session.parallel(specs) — fan out steps, results in input order
- session.pipeline(items, stages) — per-item chains, no inter-stage barrier
- session.parallel_resumable(...) — journaled, resumable fan-out

Config is loaded from .a3s/config.acl at runtime (never hardcoded).

Run: cd crates/code/sdk/python && python examples/orchestration_workflow.py
"""

import sys
from pathlib import Path

from a3s_code import Agent, SessionOptions


def repo_config_path() -> Path:
"""Find .a3s/config.acl walking up from this file."""
here = Path(__file__).resolve()
for parent in here.parents:
candidate = parent / ".a3s" / "config.acl"
if candidate.exists():
return candidate
raise SystemExit("could not locate .a3s/config.acl above this example")


def main() -> int:
config = str(repo_config_path())
agent = Agent.create(config)
session = agent.session(".", SessionOptions())

# 1. parallel — fan out independent steps; outcomes come back in order.
outcomes = session.parallel(
[
{
"task_id": "langs",
"agent": "general",
"description": "list languages",
"prompt": "Name three systems programming languages, comma-separated.",
"max_steps": 2,
},
{
"task_id": "verdict",
"agent": "general",
"description": "classify",
"prompt": "Is Rust memory-safe without a GC? Answer yes or no.",
"max_steps": 2,
# Schema-validated structured output for this step.
"output_schema": {
"type": "object",
"properties": {"memory_safe": {"type": "boolean"}},
"required": ["memory_safe"],
},
},
]
)
for o in outcomes:
print(f"[parallel] {o['task_id']}: success={o['success']} structured={o.get('structured')}")

# 2. pipeline — each item chains through stages; stage 2 builds on stage 1.
# Return None from a stage (or raise — caught and treated as None) to
# stop that item's chain.
results = session.pipeline(
["the Rust programming language"],
[
lambda ctx: {
"task_id": "summarize",
"agent": "general",
"description": "summarize",
"prompt": f"In one sentence, what is {ctx['item']}?",
"max_steps": 2,
},
lambda ctx: {
"task_id": "classify",
"agent": "general",
"description": "classify",
"prompt": "Reply with one word YES or NO: does this describe a "
f"programming language?\n\n{ctx['previous']['output']}",
"max_steps": 2,
},
],
)
for r in results:
print(f"[pipeline] final={None if r is None else r['output'][:60]!r}")

return 0


if __name__ == "__main__":
sys.exit(main())
Loading