Skip to content

feat(runtime): 对齐 PR #162 的 runtime 契约并落地可运行任务内核(状态机/事件流/子代理MVP) #180

@Lwwww1

Description

@Lwwww1

背景

PR #162 已定义 runtime 模块目标:任务状态机、多代理调度、日志增量读取、配额控制。
当前仓库 runtime 仍是占位实现(InMemoryTaskManager),尚未形成可扩展的执行内核。

现状问题

  1. 任务管理能力不完整:Retry/Stream 返回未实现,缺少 Wait 语义。
  2. 状态迁移未形成统一状态机约束(合法迁移、终态约束、错误码一致性)。
  3. 无任务日志增量读取能力(offset-based LogReader)。
  4. 子代理与配额未落地(同步/异步调度、资源争用控制)。
  5. runtime 与 agent 的边界尚未固化(TaskStatus 作为 runtime 唯一 owner 仍不够清晰)。
  6. 测试覆盖偏薄,缺少并发/失败恢复/取消传播类用例。

目标

  • 将 runtime 从占位实现升级为可用内核,并与 PR 架构设计 #162 契约对齐。
  • 保持现有 CLI/TUI 行为兼容,采用分阶段迁移。

实施计划

Phase 0:契约对齐(接口先行)

  • 对齐 runtime 对外接口:TaskManager / Scheduler / LogReader / SubAgentCoordinator / QuotaManager
  • 统一 TaskEvent/错误码/元数据字段,补齐 Wait 语义。
  • 保留兼容适配层,避免一次性改动上层调用。

Phase 1:任务状态机内核

  • 实现状态机:pending -> running -> completed|failed|killed
  • 实现合法/非法迁移校验与 invalid_transition 错误。
  • 明确终态不可再迁移规则。

Phase 2:执行闭环(超时/取消/重试)

  • 为任务引入 Timeout、取消传播、MaxRetries
  • 实现 Retry 的 attempt 递增与重试耗尽(retry_exhausted)。
  • 实现 WaitStream 事件流,保证结果可等待、可订阅。

Phase 3:日志与可回放能力

  • 增加任务日志记录结构(含 offset)。
  • 实现 LogReader.ReadIncrement(...)
  • 与 storage 建立最小桥接,支持任务日志增量读取。

Phase 4:子代理与配额(MVP)

  • 先落地同步子代理 Spawn + Wait
  • 增加基础 QuotaManager(按 key acquire/release)。
  • 异步后台归并与 worktree 隔离另开 issue。

Phase 5:上层接入与约束固化

  • agent 仅通过 runtime 发起/取消/查询任务,不直接改 TaskStatus
  • 关键任务状态变更事件可被审计链路消费。

验收标准

  1. go test ./internal/runtime -v 通过。
  2. 新增测试覆盖:
    • 状态机合法/非法迁移
    • 超时、取消、重试耗尽
    • 并发配额争用
    • 任务事件流顺序与终态一致性
  3. app/agent 侧现有流程无回归(基本 CLI/TUI 运行链路通过)。
  4. 子代理 MVP 可在同步模式下完成提交与等待。

非目标

  • 本 issue 不实现完整异步后台子代理归并。
  • 本 issue 不实现完整 worktree 隔离执行。
  • 本 issue 不涉及 provider/policy 的大规模重构。

模块架构设计(Runtime 详细版)

1. 目标边界与职责划分

  • runtimeTask 生命周期唯一 owner:创建、调度、状态迁移、取消、重试、结果收敛、日志增量读取。
  • agent/tools 只通过 TaskManager(以及后续补齐的 Scheduler/LogReader/SubAgentCoordinator/QuotaManager)交互,不直接改 Task.Status
  • core.TaskStatus 仅作为跨模块契约枚举;迁移规则只在 runtime 内部定义。
  • app/agent 保持兼容:现有调用点不强制改签名,通过适配层逐步迁移。

2. 运行时模块分层(建议落地到 internal/runtime

  1. manager(门面层)
  • 对外实现 Submit/Get/Cancel/Retry/Stream/Wait(新增)
  • 做参数校验、错误码归一、调用编排。
  1. state_machine(状态机层)
  • 负责合法迁移判定、终态保护、错误码 invalid_transition
  • 纯逻辑、可单测,不依赖 IO。
  1. scheduler(执行层)
  • 从 pending 任务拉起执行、注入 timeout/cancel/retry。
  • quota 协作控制并发资源。
  1. eventlog + stream(事件层)
  • 统一记录 TaskEvent(status/log/result/error)。
  • Stream 提供“历史回放 + 实时订阅”的顺序一致语义。
  1. log_reader(日志层)
  • 维护 offset 递增日志。
  • 提供 ReadIncrement(taskID, offset, limit)
  1. subagent(子代理层,MVP)
  • 先实现同步 Spawn + Wait
  • 继承父任务的 SessionID/TraceID,并绑定 ParentTaskID
  1. quota(配额层)
  • 基于 key 的 Acquire/Release(MVP 可用 semaphore)。
  • 超配额返回 quota_exceeded

3. 核心数据模型与契约

  • TaskID/Spec/Status/Attempt/CreatedAt/StartedAt/FinishedAt/ErrorCode
  • TaskResult:终态快照(输出、错误码、完成时间)。
  • TaskEvent:事件信封,字段至少包含 TaskID/Type/Status/Payload/ErrorCode/Timestamp
  • TaskSpec 关键字段语义:
    • Timeout:单次 attempt 的执行超时。
    • MaxRetries:失败后最多重试次数(不含首轮)。
    • ParentTaskID:子任务链路归属。
    • Background/IsolatedWorktree:本 issue 仅保留字段与兼容行为,不实现完整后台归并与隔离执行。

4. 状态机设计(唯一真源)

允许迁移:

  • pending -> running
  • pending -> killed(未启动前取消)
  • running -> completed | failed | killed
  • failed -> pending(仅当 Attempt <= MaxRetries 且触发 retry)

禁止迁移:

  • 任一终态(completed/failed/killed)直接再迁移到其他终态。
  • completed -> *killed -> *
  • running -> pending(除非通过明确 retry 流程先落 failed 再回 pending)。

错误码约束:

  • 非法迁移统一返回 invalid_transition
  • 重试耗尽返回 retry_exhausted
  • 超时统一映射 timeout(或 task_timeout,二选一后全局一致)。
  • 取消统一映射 killed 状态 + cancelled(或 task_cancelled)错误码。

5. 执行与控制流(Submit/Run/Wait/Retry/Cancel)

  1. Submit
  • 生成 TaskID,写入 pending 记录。
  • 立即写入 status(pending) 事件。
  1. Run(由 scheduler 触发)
  • Acquire(quotaKey) 成功后进入 running
  • 创建 attemptCtx = context.WithTimeout + cancel propagation
  • 执行业务函数,产生日志事件。
  • defer Release,并在结束时写 result + terminal status
  1. Wait(taskID)
  • 若已终态,立即返回 TaskResult
  • 若未终态,订阅事件并阻塞直到终态或 ctx.Done()
  1. Retry(taskID)
  • 仅允许对 failed 任务触发。
  • Attempt++,若超过 MaxRetries 返回 retry_exhausted
  • 状态回到 pending 并重新入队。
  • 返回值保持现有签名(可返回原 TaskID,接口兼容优先)。
  1. Cancel(taskID)
  • pending:直接置 killed
  • running:触发 cancel,等待执行协程收敛到 killed
  • 终态重复取消幂等返回成功(建议)或统一 invalid_transition(二选一,需文档固定)。

6. 事件流与日志增量语义

  • 事件顺序:同一 TaskID 内按写入顺序严格单调。
  • Stream(taskID)
    • 先回放已存在事件(避免错过早期状态)。
    • 再进入实时订阅。
    • 终态事件发出后可自动 close channel。
  • LogReader.ReadIncrement(taskID, offset, limit)
    • offset 为单调递增游标(建议 uint64 序号,不用字节偏移)。
    • 返回 items, nextOffset, hasMore
    • limit 缺省使用安全默认值(如 200)。

7. 并发与一致性策略

  • 任务索引:map[TaskID]*taskRecord + RWMutex
  • 单任务状态迁移在同一临界区完成:读取当前状态 -> 校验迁移 -> 写状态/时间戳 -> 追加事件
  • 事件追加必须先于 stream 推送,保证“落记录先于通知”。
  • Wait/Stream 不得持有全局锁阻塞,避免 head-of-line blocking。
  • 失败恢复最低保证:进程内一致;持久化桥接后保证重启可回放。

8. 子代理与配额 MVP 设计

  • SubAgentCoordinator.Spawn(spec):创建子任务并立即执行(同步模式)。
  • SubAgentCoordinator.Wait(taskID):等待子任务终态并返回结果。
  • 父子关系:
    • 子任务 Spec.ParentTaskID = parentID
    • 父任务取消时,向下传播取消。
  • 配额:
    • 默认 key 建议 session:{SessionID}task-kind:{Kind}
    • Acquire 失败立即失败,不进入 running。

9. 与现有代码的接入约束

  • agent.Runner 继续持有 runtime.TaskManager,不引入破坏式 API 变更。
  • tools.ExecutionContext.TaskManager 保持兼容。
  • app/bootstrap 先注入新的 runtime 实现,InMemoryTaskManager 退化为测试/兜底实现。

10. 测试矩阵(与 Phase 对齐)

  1. 状态机
  • 合法迁移表驱动测试。
  • 非法迁移返回 invalid_transition
  1. 执行控制
  • timeout 到 failed(timeout)
  • cancel 传播到 killed(cancelled)
  • retry 到达上限后 retry_exhausted
  1. 事件与日志
  • Stream 回放 + 实时顺序一致。
  • 终态事件后流关闭行为一致。
  • ReadIncrement offset 连续性、边界值(空日志、limit=0、大偏移)。
  1. 并发与配额
  • 同 key 并发争用下不超限。
  • 并发取消/重试竞争无数据竞争(go test -race)。

11. 迁移步骤(避免大爆炸)

  1. 先引入 state_machine + 错误码常量,替换当前直接写状态逻辑。
  2. Wait + Retry 最小闭环,保持 Submit/Get/Cancel/Stream 兼容。
  3. 引入 eventlog/log_reader,打通增量日志读取。
  4. 最后接入 subagent + quota 的同步 MVP。

12. 明确不在本次实现

  • 异步后台子代理归并。
  • 完整 worktree 隔离执行与冲突解算。
  • provider/policy 大重构。

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions