diff --git a/apps/docs/content/docs/cn/code/api-contract.mdx b/apps/docs/content/docs/cn/code/api-contract.mdx index 269179e..0bc1cea 100644 --- a/apps/docs/content/docs/cn/code/api-contract.mdx +++ b/apps/docs/content/docs/cn/code/api-contract.mdx @@ -343,7 +343,17 @@ const resumed = agent.resumeSession('docs-contract', { console.log(resumed.history()); ``` -Node 进程需要及时释放 session 级后台资源时,调用 `session.close()`。 +Node 进程需要及时释放 session 级后台资源时,调用 `session.close()`。`close()` 是完整的优雅停止入口:把 `session.isClosed` 翻成 `true`(之后 `send` / `stream` 会以 `Session closed` 错误立即返回),fire session 级 `CancellationToken` 让所有 in-flight run、委派子代理任务、HITL 待确认全部中止,并对当前活跃 run emit AHP `recordRunCancelled` 钩子。重复调用 `close()` 是 no-op。 + +控制面只持有 session ID 时,可以从 Agent 侧触发同样的清理: + +```ts +await agent.listSessions(); // ['session-a', 'session-b'] +await agent.closeSession('session-a'); // 若原本是 open,返回 true +await agent.close(); // 关闭所有活 session + 断开全局 MCP +``` + +`agent.close()` 之后,再调 `agent.session(...)` / `agent.resumeSession(...)` 会立即抛 `Session closed`。幂等。建议在进程退出 handler 中调用,保证没有 session 级 worker 比 agent 活得更久。 ## Delegation @@ -452,3 +462,162 @@ new UnixSocketTransport('/tmp/a3s.sock').kind; // 'unix_socket' ``` 该检查不断言 live AHP server exchange。 + +## 集群级扩展点 + +这些契约让集群控制面(例如 书安OS)在**不 fork 框架**的前提下接入多租户、成本管控和容错运行。框架定义"决策点"和"结构化事件",**策略实现由 host 提供**。 + +### 身份标签 + +`SessionOptions` 上四个可选 slot,会透传到 hooks / traces / `SessionData`,框架本身不解释: + +```ts +const session = agent.session(workspace, { + tenantId: 'acme-prod', + principal: 'svc-deploy-bot', + agentTemplateId: 'ci-runner-v7', + correlationId: 'trace-1234abcd', + sessionStore: new FileSessionStore('./sessions'), +}); +session.tenantId; // -> 'acme-prod' +session.correlationId; // -> 'trace-1234abcd' +``` + +resume 时 `apply_persisted_runtime_options` 会从持久化快照里还原标签;但**调用方在 resume_session 时传的 opts 优先**,可以借此 relabel。 + +### 预算 / 成本守卫 + +`BudgetGuard` 在每次 LLM 调用前(以及调用后做用量记录)被询问。`Deny` 返回 `CodeError::BudgetExhausted { resource, reason }`;`SoftLimit` 发射 `AgentEvent::BudgetThresholdHit { kind: "soft", .. }` 后继续执行。 + +目前仅 Rust 层接入(Node/Python wrapper 后续补): + +```rust +let guard: Arc = /* host-supplied impl */; +let opts = SessionOptions::new().with_budget_guard(guard); +``` + +### 集群事件词汇 + +`AgentEvent`(`#[non_exhaustive]`)新增三类平台级事件,host 通过 `HookExecutor` 注入: + +- `BudgetThresholdHit { resource, kind, consumed, limit, message? }` +- `PassivationRequested { reason, deadline_ms? }` +- `PeerInvocation { from_session_id, from_tenant_id?, correlation_id? }` + +session 内部 hook 可统一订阅,不必关心 host 用什么传输发过来。 + +### 确定性 ID / 时钟 + +`HostEnv { id_generator, clock }` 替换默认的 `uuid::Uuid::new_v4()` + 墙上时钟。Replay 工具传入 `SequentialIdGenerator` + `FixedClock` 即可在另一台机器上 bit-identical 重放一个 run。 + +### Loop checkpoint + run 恢复 + +配置了 `SessionStore` 后,agent loop **每次 tool round 结束**会持久化一个 `LoopCheckpoint`(按 `run_id` 索引)。任何拥有同一个 store 的节点都能从最近的边界 rehydrate: + +```ts +// Node — host 探测到 A 节点死掉;在 B 节点上: +const session = agentB.session(workspace, { + sessionStore: new FileSessionStore('./sessions'), + sessionId: 'session-from-node-a', +}); +const result = await session.resumeRun('run-id-from-node-a'); +``` + +```python +# Python 等价 +opts = SessionOptions() +opts.session_store = FileSessionStore('./sessions') +opts.session_id = 'session-from-node-a' +session = agent_b.session(workspace, opts) +result = session.resume_run('run-id-from-node-a') +``` + +resume 出来的会**分配一个全新的 run id** — 框架不假装旧 run 还在继续,新旧 run 的关系是 host 的元数据。两个可区分的错误路径方便 host 端调度分支: + +- `"resume_run requires a session_store"` — host 应该回退到新建 session。 +- `"no loop checkpoint found for run 'X'"` — host 可以稍等重试(checkpoint 写入竞态),或当 run 已丢失。 + +**边界策略**:checkpoint 只在 tool round **之间**取,不在工具执行中途取。进程在工具执行中途死掉时,这一轮的工作会丢失,LLM 从前一个边界重新思考。这是用"重试成本"换"正确性" — 把非幂等工具(write、bash)在边界两侧重跑比让 LLM 重想要糟得多。 + +### 长跑 session 的保留上限 + +`SessionRetentionLimits` 让 host 给四种 in-memory 存储设上限:run 记录、每 run 的事件、trace 事件、**终态的** subagent 任务快照。每个字段都是可选的(`None` 保持原本无上限的默认 — 短 session 没问题,小时/天级的就漏内存)。FIFO 严格按插入序丢;**Running 状态的** subagent 任务永不被丢。 + +```rust +use a3s_code_core::retention::SessionRetentionLimits; + +let limits = SessionRetentionLimits::new() + .with_max_runs(100) + .with_max_events_per_run(5_000) + .with_max_trace_events(10_000) + .with_max_terminal_subagent_tasks(1_000); + +let opts = SessionOptions::new().with_retention_limits(limits); +``` + +上限建议跟 host 自己 Prometheus / 观测系统的内存预算保持一致。SDK 直接调用形式后续补。 + +### MCP 闲置断开 + +`Agent::disconnect_idle_mcp(threshold_ms)` 扫描所有已连接的 MCP server,把"最后活跃时间"早于 `now - threshold_ms` 的全部断开。注册的配置**保留** — 后续 tool 调用会按需重连。返回被断开的 server 名称列表。 + +```ts +// Node — 周期回收闲置 MCP 子进程 +setInterval(async () => { + const dropped = await agent.disconnectIdleMcp(5 * 60 * 1000); // 5min + if (dropped.length) { + console.log('reaped idle MCP servers:', dropped); + } +}, 60_000); +``` + +```python +# Python — 等价 +dropped = agent.disconnect_idle_mcp(5 * 60 * 1000) +``` + +每次 `connect` 和成功的 `call_tool` 都会刷新活跃时间。Host 走旁路通道路由 tool 时,可以手动 `McpManager.touch(name)` 把 server 保温。 + +### BudgetGuard 的 SDK 桥接 + +两个 SDK 共用同一个决策返回形状: + +| 返回值 | 效果 | +|-----------------------------------------------------------------------|-------------------------------------------------------------------------------| +| `None` / `null` / `{decision:'allow'}` | 静默放行 | +| `{decision:'soft', resource, consumed, limit, message?}` | 发射 `BudgetThresholdHit('soft')` 事件,继续执行 | +| `{decision:'deny', resource, reason}` | 中止调用,Python 抛 `RuntimeError("Budget exhausted...")`/Node reject 同样的错误 | + +guard 对象上缺失的方法 = 宽松默认(Allow / no-op);callback 抛错 → fallback 到 Allow,异常的 guard 不会拖垮 live session。 + +```python +# Python — 通过 SessionOptions 在 session 构造前挂上 +class MyGuard: + def check_before_llm(self, session_id, estimated_tokens): + return {"decision": "deny", "resource": "llm_tokens", "reason": "cap"} + def record_after_llm(self, session_id, usage): + track(session_id, usage["total_tokens"]) + +opts = SessionOptions() +opts.budget_guard = MyGuard() +session = agent.session(workspace, opts) +``` + +```ts +// Node — session 构造后通过 setBudgetGuard 挂上。 +// JsFunction 不能塞进值类型的 SessionOptions,所以 guard 在 Session 上注册, +// 下一次 send/stream 生效。 +session.setBudgetGuard({ + checkBeforeLlm: (sessionId, estimatedTokens) => { + if (overBudget(sessionId)) { + return { decision: 'deny', resource: 'llm_tokens', reason: 'cap' }; + } + return null; + }, + recordAfterLlm: (sessionId, usage) => { + track(sessionId, usage.total_tokens); + }, +}); +``` + +Node 用 `setBudgetGuard(null)` 清除;Python 把 `opts.budget_guard` 设回 `None` 后重建 session。 diff --git a/apps/docs/content/docs/en/code/api-contract.mdx b/apps/docs/content/docs/en/code/api-contract.mdx index 04fa163..56a13b9 100644 --- a/apps/docs/content/docs/en/code/api-contract.mdx +++ b/apps/docs/content/docs/en/code/api-contract.mdx @@ -351,7 +351,26 @@ console.log(resumed.history()); ``` Use `session.close()` when a Node process should release session-scoped -background resources promptly. +background resources promptly. `close()` is the full graceful-stop entry +point: it flips `session.isClosed` to `true` (further `send` / `stream` +calls reject with a `Session closed` error), fires the session-level +`CancellationToken` so every in-flight run, delegated subagent task, and +HITL confirmation aborts, and emits the AHP `recordRunCancelled` hook for +the currently active run. Subsequent `close()` calls are no-ops. + +For control-plane callers that only know the session ID, the same cleanup +is reachable from the agent: + +```ts +await agent.listSessions(); // ['session-a', 'session-b'] +await agent.closeSession('session-a'); // true if it was open +await agent.close(); // close every live session + disconnect global MCP +``` + +After `agent.close()`, subsequent `agent.session(...)` and +`agent.resumeSession(...)` calls reject with a `Session closed` error. +Idempotent. Use this in process-shutdown handlers to guarantee no +session-scoped workers outlive the agent. ## Delegation @@ -461,3 +480,204 @@ new UnixSocketTransport('/tmp/a3s.sock').kind; // 'unix_socket' ``` The check does not assert a live AHP server exchange. + +## Cluster-grade extension points + +These contracts let a cluster control plane (e.g. 书安OS) wire +multi-tenancy, cost governance, and crash-tolerant runs **without +forking the framework**. The framework defines decision points and +emits structured events; the host supplies the policy implementations. + +### Identity labels + +Four optional `SessionOptions` slots are propagated through hooks, +traces, and `SessionData` but never interpreted by the framework: + +```ts +const session = agent.session(workspace, { + tenantId: 'acme-prod', + principal: 'svc-deploy-bot', + agentTemplateId: 'ci-runner-v7', + correlationId: 'trace-1234abcd', + sessionStore: new FileSessionStore('./sessions'), +}); +session.tenantId; // -> 'acme-prod' +session.correlationId; // -> 'trace-1234abcd' +``` + +`apply_persisted_runtime_options` restores them on resume; caller- +supplied options on resume take precedence so you can relabel. + +### Budget / cost guard + +`BudgetGuard` is consulted before every LLM call (and after, for +usage accounting). `Deny` returns +`CodeError::BudgetExhausted { resource, reason }`; `SoftLimit` emits +an `AgentEvent::BudgetThresholdHit { kind: "soft", .. }` and proceeds. + +Wire from Rust today (Node/Python wrappers will follow): + +```rust +let guard: Arc = /* host-supplied impl */; +let opts = SessionOptions::new().with_budget_guard(guard); +``` + +### Cluster event vocabulary + +`AgentEvent` (non-exhaustive) carries platform-level events the host +emits via `HookExecutor`: + +- `BudgetThresholdHit { resource, kind, consumed, limit, message? }` +- `PassivationRequested { reason, deadline_ms? }` +- `PeerInvocation { from_session_id, from_tenant_id?, correlation_id? }` + +In-session hooks subscribe to these to react uniformly regardless of +how the host's transport delivers them. + +### Deterministic IDs / time + +`HostEnv { id_generator, clock }` replaces the default +`uuid::Uuid::new_v4()` + wall-clock pair. Replay tooling configures +`SequentialIdGenerator` + `FixedClock` to recreate a run bit-identical +on another node. + +### Loop checkpoints + run resumption + +When a `SessionStore` is configured, the agent loop persists a +`LoopCheckpoint` after each completed tool round, keyed by `run_id`. +Any node holding the same store can rehydrate a run from its last +boundary: + +```ts +// Node — host detected node A died mid-run; on node B: +const session = agentB.session(workspace, { + sessionStore: new FileSessionStore('./sessions'), + sessionId: 'session-from-node-a', +}); +const result = await session.resumeRun('run-id-from-node-a'); +``` + +```python +# Python equivalent +opts = SessionOptions() +opts.session_store = FileSessionStore('./sessions') +opts.session_id = 'session-from-node-a' +session = agent_b.session(workspace, opts) +result = session.resume_run('run-id-from-node-a') +``` + +A **new** run id is allocated for the resumed work — the framework +does not pretend the old run continues. Two distinguishable error +paths: + +- `"resume_run requires a session_store"` — host should fall back to + a fresh session. +- `"no loop checkpoint found for run 'X'"` — host can retry later + (race against checkpoint write) or treat the run as lost. + +Boundary policy: checkpoints are taken **only between tool rounds**, +never mid-tool. If a process dies mid-tool the work of that round is +lost; the LLM re-deliberates from the previous boundary. This trades +retry cost for correctness — re-executing a non-idempotent tool +across the boundary is worse than re-asking the LLM. + +### Retention caps for long-running sessions + +`SessionRetentionLimits` lets the host cap the four in-memory stores +that grow with session age: the run records, per-run event buffers, +trace events, and **terminal** subagent task snapshots. Each cap is +optional (`None` keeps the unbounded default — fine for short +sessions, a memory leak for hour- or day-long ones). Eviction is +strict FIFO; running subagent tasks are never dropped. + +```rust +use a3s_code_core::retention::SessionRetentionLimits; + +let limits = SessionRetentionLimits::new() + .with_max_runs(100) + .with_max_events_per_run(5_000) + .with_max_trace_events(10_000) + .with_max_terminal_subagent_tasks(1_000); + +let opts = SessionOptions::new().with_retention_limits(limits); +``` + +The host should pick caps from the same observability budget that +caps the rest of its in-memory state (Prometheus carries history +anyway). SDK shapes for retention land in a follow-up. + +### MCP idle disconnect + +`Agent::disconnect_idle_mcp(threshold_ms)` walks the connected MCP +servers and drops any whose last activity is older than +`now - threshold_ms`. The server's *registered config* stays — a +later tool call will reconnect on demand. Returns the names of +disconnected servers. + +```ts +// Node — periodically reap quiet MCP subprocesses. +setInterval(async () => { + const dropped = await agent.disconnectIdleMcp(5 * 60 * 1000); // 5 min + if (dropped.length) { + console.log('reaped idle MCP servers:', dropped); + } +}, 60_000); +``` + +```python +# Python — same shape. +dropped = agent.disconnect_idle_mcp(5 * 60 * 1000) +``` + +Activity is stamped on `connect` and on every successful `call_tool`. +Hosts that route tool traffic through a side channel can call +`McpManager.touch(name)` to manually keep a server warm. + +### BudgetGuard SDK bridges + +Both SDKs accept the same decision shape: + +| Return | Effect | +|-----------------------------------------------------------------------|-----------------------------------------------------------------------| +| `None` / `null` / `{decision:'allow'}` | proceed silently | +| `{decision:'soft', resource, consumed, limit, message?}` | emit `BudgetThresholdHit('soft')` event, proceed | +| `{decision:'deny', resource, reason}` | abort the call, throw `RuntimeError("Budget exhausted...")` (Python) | +| | / reject with `"Budget exhausted..."` (Node) | + +Missing methods on the guard object are treated as a permissive +default (Allow / no-op). Callback errors fall back to Allow — a +misbehaving guard cannot halt a live session. + +```python +# Python — attach via SessionOptions before agent.session(...) +class MyGuard: + def check_before_llm(self, session_id, estimated_tokens): + return {"decision": "deny", "resource": "llm_tokens", "reason": "cap"} + def record_after_llm(self, session_id, usage): + track(session_id, usage["total_tokens"]) + +opts = SessionOptions() +opts.budget_guard = MyGuard() +session = agent.session(workspace, opts) +``` + +```ts +// Node — attach via session.setBudgetGuard after construction. +// JsFunction values can't live inside the value-typed SessionOptions, +// so the guard is installed on the Session itself; takes effect on +// the next send/stream. +session.setBudgetGuard({ + checkBeforeLlm: (sessionId, estimatedTokens) => { + if (overBudget(sessionId)) { + return { decision: 'deny', resource: 'llm_tokens', reason: 'cap' }; + } + return null; + }, + recordAfterLlm: (sessionId, usage) => { + track(sessionId, usage.total_tokens); + }, +}); +``` + +Pass `null` to `setBudgetGuard` (Node) or set `opts.budget_guard = +None` and re-create the session (Python) to clear. diff --git a/crates/code b/crates/code index 6499123..4470293 160000 --- a/crates/code +++ b/crates/code @@ -1 +1 @@ -Subproject commit 6499123f2b693d6602397dfcd71336bcc5f8f41c +Subproject commit 44702931ea7f0cfb26580ea9e6e1bad58729b908