diff --git a/REVIEW_FINDINGS.md b/REVIEW_FINDINGS.md new file mode 100644 index 0000000..34675ee --- /dev/null +++ b/REVIEW_FINDINGS.md @@ -0,0 +1,358 @@ +# Roundtable v2 Code Review - 发现的问题 + +审查日期:2026-06-07 +审查范围:Agent Dispatch & Federated Meetings 升级 +变更规模:20 个文件,+2700/-76 行 + +--- + +## 🔴 严重问题 (Critical Issues) + +### 1. **Race Condition: 跨进程 Web 同步** +**位置**: `core.py:294-296`, `core.py:795-796` + +```python +# core.py line 294-296 +else: + # Publisher not in memory (cross-process) — update discussion.json directly + self._update_web_discussion_json(discussion_id, speech_data, disc.topic, participants) +``` + +**问题**: +- 当一个进程持有 `WebPublisher` 实例,另一个进程直接写 `discussion.json` 时会产生数据竞争 +- `WebPublisher` 可能缓存了旧状态,跨进程写入会被覆盖 +- 没有文件锁机制保护并发写入 + +**影响**: 数据丢失、状态不一致 +**建议**: +- 使用文件锁 (`fcntl.flock`) 或改用 SQLite 作为唯一数据源 +- 将 web viewer 改为纯读模式,从数据库实时查询 + +--- + +### 2. **SQL Injection 风险:动态 SQL 构建** +**位置**: `db.py:1036` + +```python +placeholders = ",".join("?" for _ in message_ids) +cur = conn.execute( + f"UPDATE agent_inbox SET read_at = ? WHERE id IN ({placeholders}) AND read_at IS NULL", + [now, *message_ids], +) +``` + +**问题**: +- 虽然使用了参数化查询,但如果 `message_ids` 是空列表,会生成无效 SQL +- 已有保护 (`if not message_ids: return 0`),但类似模式在其他地方可能遗漏 + +**影响**: 潜在的运行时错误 +**建议**: 添加单元测试覆盖边界情况(空列表、None 值) + +--- + +### 3. **幂等性键冲突处理不完整** +**位置**: `db.py:1146-1149` + +```python +if idempotency_key: + existing = self.get_dispatch(conn, idempotency_key=idempotency_key) + if existing: + return existing +``` + +**问题**: +- 仅在 `create_dispatch` 返回已存在的记录,但不检查状态是否匹配 +- 如果一个 dispatch 被 cancelled,幂等键会阻止重试 +- `create_summon` 也有类似问题 (`INSERT OR IGNORE`) + +**影响**: 无法重试失败的调度 +**建议**: +- 检查已存在 dispatch 的状态,如果是终态 (completed/cancelled) 则允许新建 +- 或者添加 `allow_retry` 参数明确控制行为 + +--- + +## 🟡 中等问题 (Medium Issues) + +### 4. **心跳超时硬编码** +**位置**: `db.py:967` + +```python +"online": (now - row["last_seen"]) < 90, +``` + +**问题**: +- 90 秒超时硬编码在多处 (`get_agent`, `list_agents`) +- 与 `list_agents` 的 `timeout_seconds` 参数不一致 + +**影响**: 配置不灵活,不同调用点逻辑不统一 +**建议**: 提取为常量 `DEFAULT_HEARTBEAT_TIMEOUT = 90` + +--- + +### 5. **缺少数据库索引** +**位置**: `schema.py:84-89` + +**问题**: +- `dispatches` 表缺少 `coordinator_agent_id` 索引 +- `summons` 表缺少 `(status, expires_at)` 组合索引(用于超时扫描) +- `agent_inbox` 表缺少 `(discussion_id, read_at)` 索引 + +**影响**: 高负载下查询性能降低 +**建议**: 添加迁移创建索引: +```sql +CREATE INDEX IF NOT EXISTS idx_dispatches_coordinator ON dispatches(coordinator_agent_id); +CREATE INDEX IF NOT EXISTS idx_summons_timeout ON summons(status, expires_at); +CREATE INDEX IF NOT EXISTS idx_inbox_discussion ON agent_inbox(discussion_id, read_at); +``` + +--- + +### 6. **WebPublisher 生命周期管理混乱** +**位置**: `core.py:782-790` + +```python +# Disk-backed viewer keeps serving via shared PM2 server; +# release the in-memory publisher to avoid leaking. +self._publishers.pop(discussion_id, None) +logger.info("Web publisher retained for concluded discussion %s", discussion_id) +``` + +**问题**: +- 注释说 "retained",但代码执行了 `pop` 移除 +- `_publishers` 字典只在 concluded 时清理,active/cancelled 可能泄漏 +- 跨进程场景下 `publisher` 可能为 None,但仍尝试调用方法 + +**影响**: 内存泄漏、空指针错误 +**建议**: +- 统一生命周期:要么全部 retain,要么全部 stop +- 添加 `__del__` 或使用 `weakref` 自动清理 + +--- + +### 7. **AgentDaemon 轮询效率低** +**位置**: `agent.py:99-101` + +```python +while True: + self.tick() + time.sleep(self.poll_interval) +``` + +**问题**: +- 即使没有消息也每 2 秒轮询一次 +- 无法在 inbox 为空时阻塞等待 +- 多 agent 运行时会产生 N*0.5 QPS 的无效查询 + +**影响**: CPU 和数据库资源浪费 +**建议**: +- 使用 `SELECT ... WHERE read_at IS NULL AND created_at > ?` 只查询新消息 +- 或改为事件驱动架构(webhook + SSE) + +--- + +### 8. **错误处理不一致** +**位置**: 多处 `handle_tool_call` + +**问题**: +- 有的返回 `{"error": "..."}` dict +- 有的抛出 `RoundtableError` 异常 +- HTTP bridge 需要检查 `"error" not in result`,脆弱 + +**影响**: 调用方难以统一处理错误 +**建议**: +- 统一错误协议:所有工具返回 `{"ok": bool, "error": str | None, ...}` +- 或者在 MCP 层统一捕获异常转换为标准格式 + +--- + +## 🟢 轻微问题 (Minor Issues) + +### 9. **类型注解缺失** +**位置**: `orchestrator.py`, `agent.py` + +**问题**: +- 新增的 `agent.py` 和 `orchestrator.py` 缺少完整类型注解 +- `ManagedOrchestrator.start_discussion` 返回类型未标注 +- 多处使用 `dict[str, Any]` 而非 TypedDict + +**影响**: IDE 提示不完整,运行时错误风险 +**建议**: 添加类型注解并用 mypy 检查 + +--- + +### 10. **日志级别使用不当** +**位置**: `generic.py:154`, `codex.py` 等 + +```python +logger.debug("Webhook delivery failed: %s", e) +``` + +**问题**: +- webhook 失败用 `debug` 级别,生产环境默认不可见 +- 但这是重要的集成失败,应该用 `warning` + +**影响**: 运维可观测性差 +**建议**: 调整为 `logger.warning("Webhook delivery failed: %s", e)` + +--- + +### 11. **测试覆盖率不足** +**位置**: `tests/test_orchestrator.py`, `tests/test_agent_runtime.py` + +**问题**: +- 新增 800+ 行数据库代码,只有 2 个测试文件 +- 缺少 dispatch readiness 状态机测试 +- 缺少超时和重试逻辑测试 +- 缺少并发场景测试 + +**影响**: 核心功能未充分验证 +**建议**: 补充测试覆盖: +- Dispatch 状态转换(pending → active → completed) +- Summon 超时和重试 +- 跨进程 web 同步冲突 +- Agent 心跳超时检测 + +--- + +### 12. **文档与实现不一致** +**位置**: `architecture.md:200` + +```markdown +### MCP Tools (21 个) +``` + +**问题**: +- 文档说 21 个工具,但 `tools.py` 实际只有 15 个左右 +- 新增的 `roundtable_summon_agents` 等工具未在文档详细说明 + +**影响**: 用户困惑 +**建议**: 更新架构文档,列出所有工具及用途 + +--- + +### 13. **Magic Numbers 散布** +**位置**: 多处 + +```python +timeout_seconds: int = 60 # dispatch 超时 +timeout_seconds: int = 90 # agent 超时 +poll_interval: float = 2.0 # 轮询间隔 +retry_timeout_seconds: int = 60 # 重试超时 +``` + +**问题**: +- 各种超时值散布在代码中 +- 没有统一配置入口 +- 难以根据部署环境调优 + +**影响**: 配置不灵活 +**建议**: 提取到配置类或环境变量 + +--- + +### 14. **HTTP Bridge 安全性** +**位置**: `codex.py:233-244`, `generic.py:288-299` + +**问题**: +- 认证 token 可选 (`auth_token: str | None = None`) +- 默认监听 `127.0.0.1`,但允许改为 `0.0.0.0` +- 没有速率限制,容易被滥用 + +**影响**: 生产环境暴露风险 +**建议**: +- 强制要求 auth_token(或生成随机 token) +- 添加 rate limiting (例如每分钟 60 次) +- 记录所有 POST 请求审计日志 + +--- + +### 15. **JSON 反序列化异常处理不统一** +**位置**: `db.py:664-672`, 多处 + +```python +@staticmethod +def _loads_json(raw: Any, default: Any = None) -> Any: + if raw is None or raw == "": + return default + if isinstance(raw, (dict, list)): + return raw + try: + return json.loads(raw) + except (TypeError, json.JSONDecodeError): + return default +``` + +**问题**: +- 静默吞掉所有解析错误 +- 不记录日志,调试困难 +- `default=None` 可能掩盖数据损坏 + +**影响**: 数据损坏时难以发现 +**建议**: 至少 log warning,或在开发模式抛出异常 + +--- + +## ✅ 做得好的地方 (Good Practices) + +1. **幂等性设计**: Dispatch 和 Summon 都支持 idempotency_key +2. **事件审计**: `summon_events` 表记录所有状态变更 +3. **Schema 迁移**: 使用 `PRAGMA user_version` 管理迁移 +4. **外键约束**: 启用 `PRAGMA foreign_keys=ON`,保证引用完整性 +5. **WAL 模式**: 使用 WAL journal mode 提升并发性能 +6. **分层架构**: Core/DB/Schema 分离清晰 +7. **测试友好**: 支持 `db_path` 参数,方便单元测试 + +--- + +## 📋 优先级建议 + +### 立即修复 (P0) +1. ⚠️ 跨进程 Web 同步竞争 (#1) +2. ⚠️ 幂等性键冲突处理 (#3) + +### 下个版本修复 (P1) +3. WebPublisher 生命周期 (#6) +4. 错误处理统一 (#8) +5. 补充测试覆盖 (#11) + +### 技术债务 (P2) +6. 数据库索引优化 (#5) +7. AgentDaemon 轮询优化 (#7) +8. 类型注解补全 (#9) +9. HTTP Bridge 安全加固 (#14) + +### 代码质量改进 (P3) +10. 心跳超时常量化 (#4) +11. 日志级别调整 (#10) +12. 文档更新 (#12) +13. 配置集中化 (#13) +14. JSON 反序列化日志 (#15) + +--- + +## 🎯 总体评价 + +**代码质量**: ⭐⭐⭐⭐ (4/5) +- 架构设计清晰,模块职责分明 +- 数据库设计合理,支持复杂查询 +- 代码风格统一,可读性良好 + +**测试覆盖**: ⭐⭐⭐ (3/5) +- 核心流程有测试,但覆盖不全面 +- 缺少边界情况和并发场景测试 +- 建议补充至少 10+ 个测试用例 + +**生产就绪度**: ⭐⭐⭐ (3/5) +- 功能完整,但存在竞争条件风险 +- 需要修复 P0 问题后才能上线 +- 建议在 staging 环境压测 + +**推荐行动**: +1. ✅ 修复跨进程同步问题(使用文件锁或移除 WebPublisher 状态缓存) +2. ✅ 补充测试覆盖(dispatch readiness, timeout, retry) +3. ✅ 添加数据库索引 +4. ⏳ 在 staging 环境运行 3-5 天,监控日志 +5. ⏳ 完成后再合并到 main 分支 + +**估计修复时间**: 2-3 天 diff --git a/docs/architecture.md b/docs/architecture.md index d4d2859..3091a34 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -18,7 +18,7 @@ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ MCP Server (roundtable.mcp) │ │ │ │ │ │ │ │ -│ │ Tools (15) │ Resources (4) │ Prompts (3) │◄───────────┘ │ +│ │ Tools (21) │ Resources (4) │ Prompts (3) │◄───────────┘ │ │ │ │ HTTP Bridge │ │ └───────────────────────────┬───────────────────────────┘ (port 8201) │ │ │ │ @@ -197,25 +197,31 @@ └──────────────────────────────────────────────────────────────────────┘ ``` -### MCP Tools (15 个) - -| 类别 | Tool | 用途 | -|------------|-------------------------------|-----------------------------------| -| Agent | `roundtable_register_agent` | 注册 agent,使其可被发现 | -| Agent | `roundtable_list_agents` | 查询在线 agent 列表 | -| Discussion | `roundtable_create` | 创建讨论 + 同步发邀请 | -| Discussion | `roundtable_list` | 列出所有讨论 | -| Invitation | `roundtable_invite` | 邀请 agent 加入 | -| Invitation | `roundtable_accept_invite` | 接受邀请并加入参与者 | -| Invitation | `roundtable_decline_invite` | 拒绝邀请 | -| Inbox | `roundtable_inbox` | 读取自己的消息(邀请 / 轮次提示) | -| Speech | `roundtable_speak` | 记录发言 | -| Speech | `roundtable_read` | 读取讨论历史 | -| Speech | `roundtable_wait_for_turn` | 检查是否轮到自己 | -| Round | `roundtable_advance` | 手动推进轮次 | -| Round | `roundtable_status` | 获取讨论状态 + 收敛度 | -| Conclude | `roundtable_summarize` | 获取结构化总结 | -| Conclude | `roundtable_end` | 结束讨论 | +### MCP Tools (21 个) + +| 类别 | Tool | 用途 | +|------------|-------------------------------|-------------------------------------------| +| Agent | `roundtable_register_agent` | 注册 agent、transport、skill 与可用性 | +| Agent | `roundtable_list_agents` | 按在线状态、skill、availability 查询 agent | +| Agent | `roundtable_heartbeat` | 运行时心跳,刷新 last_seen 与 availability | +| Dispatch | `roundtable_summon_agents` | 创建/使用 assembling 会议并召集 agent | +| Dispatch | `roundtable_dispatch_status` | 查询 dispatch/summon/readiness 状态 | +| Dispatch | `roundtable_retry_summon` | 重投递 pending/failed/timeout summon | +| Summon | `roundtable_accept_summon` | 接受召集并加入参与者 | +| Summon | `roundtable_decline_summon` | 拒绝召集 | +| Discussion | `roundtable_create` | 创建讨论 + 可选传统邀请 | +| Discussion | `roundtable_list` | 列出所有讨论,含 assembling | +| Invitation | `roundtable_invite` | 传统邀请 agent 加入 | +| Invitation | `roundtable_accept_invite` | 接受传统邀请并加入参与者 | +| Invitation | `roundtable_decline_invite` | 拒绝传统邀请 | +| Inbox | `roundtable_inbox` | 读取自己的消息(召集 / 邀请 / 轮次提示) | +| Speech | `roundtable_speak` | 记录发言 | +| Speech | `roundtable_read` | 读取讨论历史 | +| Speech | `roundtable_wait_for_turn` | 检查是否轮到自己 | +| Round | `roundtable_advance` | 手动推进轮次 | +| Round | `roundtable_status` | 获取讨论状态、收敛度与 dispatch 状态 | +| Conclude | `roundtable_summarize` | 获取结构化总结 | +| Conclude | `roundtable_end` | 结束讨论 | ### MCP Resources @@ -234,6 +240,31 @@ ## 多 Agent 邀请流程 ``` + +## 多 Agent 召集流程 + +``` + Coordinator MCP Server Registry/DB Agent Bridge + │ │ │ │ + │── roundtable_summon_agents ────────────────────►│ │ + │ topic / required_skill / min_accepts │ │ + │ │── create discussion │ │ + │ │ status='assembling' ──►│ │ + │ │── select online agents │ │ + │ │ by skill/availability │ │ + │ │── INSERT dispatch ──────►│ │ + │ │── INSERT summons ───────►│ │ + │ │── push_inbox(summon) ───►│ │ + │ │── POST /summon ───────────────────────────────────►│ + │ │ │◄── accept_summon ───────│ + │ │◄──────── roundtable_accept_summon ─────────────────│ + │ │── INSERT participant ───►│ │ + │ │── readiness quorum met ─►│ │ + │ │── discussion active ────►│ │ + │◄── dispatch/readiness/summons ──────────────────│ │ +``` + +召集是当前主路径:`agents.metadata.skills` 表示 agent 安装的 skill,`availability` 表示运行时状态,`summons` 记录单个 agent 的响应,`dispatches` 记录整次召集的启动策略。`start_policy` 支持 `immediate`、`quorum`、`all`、`timeout`;满足策略后,`assembling` 会议会被激活为 `active`。`roundtable_retry_summon` 复用既有 summon 行做重投递,不创建重复召集记录;显式 `allow_terminal_retry` 可在终态 dispatch 上释放旧 `idempotency_key` 并创建新 dispatch。 Coordinator MCP Server Inbox/DB Participant │ │ │ │ │── roundtable_create ─►│ │ │ @@ -277,6 +308,7 @@ │ HTTP :8201 │ │ WorkBuddy/... │ │ │ │ │ │ POST /invite │ │ │ + │ POST /summon │ │ │ │ POST /tool │ │ │ └─────┬────────────┘ └──────────────────┘ │ @@ -290,31 +322,52 @@ Bridge 模式的设计要点: - **共享状态总线**:所有平台都通过同一个 SQLite DB 协调,不需要中间消息队列 - **三种传输模式**:stdio(原生 MCP)/ HTTP(远程或非 MCP 平台)/ in-process(Hermes adapter) -- **解耦邀请与执行**:邀请进 inbox,agent 自己 poll,避免长连接和事件丢失 +- **解耦召集与执行**:召集/邀请都进 inbox;HTTP bridge 也可通过 `/summon` 自动接受 +- **运行时注册**:`GenericBridge`、`CodexBridge` 和 `AgentDaemon` 启动时注册 `agent-roundtable` skill,并通过 heartbeat 刷新在线状态 +- **可选桥接鉴权**:HTTP bridge 可配置 bearer token;MCP 投递 `/invite`、`/summon`、`/turn` 时会自动携带本地私有 token,公开 agent 列表会过滤该字段 + +## Web Viewer 状态 + +Web Viewer 的 `discussion.json` schema v3 额外包含 `dispatches` 与 `dispatch_summary`,用于展示 `assembling` 会议的召集进度、accepted/pending/failed/timeout 计数,以及 dispatch readiness。跨进程更新由 `web_sync.py` 从 SQLite 同步,避免 HTTP bridge 或 AgentDaemon 接受召集后 viewer 状态滞后。 + +## 调度模式 + +| 模式 | 实现入口 | 适用场景 | 启动方式 | +|-------------|--------------------------|----------------------------------|----------------------------------| +| Managed | `ManagedOrchestrator` | 同平台、同进程或宿主可直接控制 agent | 直接创建 active discussion | +| Federated | `FederatedOrchestrator` | 跨平台、跨进程、HTTP/stdio 混合 agent | registry + summon + heartbeat | + +两种模式共享同一套 `discussions`、`participants`、`dispatches`、`summons` 状态模型。Managed Mode 不依赖 agent registry 选人;Federated Mode 通过 skill、availability、online heartbeat 做发现和召集。 -## 数据库 Schema (v2) +## 数据库 Schema (v4) ``` discussions ──┬─► participants ├─► speeches ──► findings ├─► convergence_history - └─► invitations ◄─── agents + ├─► invitations ◄─── agents + └─► dispatches ──► summons ──► summon_events + ▲ │ - └─► agent_inbox + agent_inbox ``` -| 表 | 关键字段 | v2 新增 | -|-----------------|-----------------------------------------------|---------| -| discussions | id, topic, status, current_round, max_rounds | | -| participants | discussion_id + participant (PK), role | | -| speeches | id, discussion_id, round, participant | | -| findings | type (consensus/disagreement/new_point) | | -| convergence | discussion_id + round (PK), score | | -| **agents** | agent_id (PK), platform, transport, endpoint | ✓ | -| **agent_inbox** | id, agent_id, type, payload, read_at | ✓ | -| **invitations** | discussion_id + agent_id (UNIQUE), status | ✓ | +| 表 | 关键字段 | 说明 | +|-------------------|------------------------------------------------------|------| +| discussions | id, topic, status, current_round, max_rounds | status 支持 `assembling` | +| participants | discussion_id + participant (PK), role | 接受 invite/summon 后加入 | +| speeches | id, discussion_id, round, participant | 发言记录 | +| findings | type (consensus/disagreement/new_point) | 结构化观点 | +| convergence | discussion_id + round (PK), score | 收敛度历史 | +| agents | agent_id (PK), platform, transport, endpoint, metadata | metadata 含 skills/availability | +| agent_inbox | id, agent_id, type, payload, read_at | summon/invite/turn 消息 | +| invitations | discussion_id + agent_id (UNIQUE), status | 传统邀请路径 | +| dispatches | id, discussion_id, mode, start_policy, status | 一次召集/调度 | +| summons | id, dispatch_id, agent_id, status, expires_at | 单 agent 召集生命周期 | +| summon_events | summon_id, dispatch_id, event, payload | 可审计事件流 | 迁移由 `PRAGMA user_version` + `_MIGRATIONS` 列表管理;新增迁移仅需 append 函数。 +v4 在 v3 召集表基础上新增查询索引:`idx_dispatches_coordinator`、`idx_summons_timeout`、`idx_inbox_discussion`,用于协调者过滤、过期 summon 扫描和 discussion 维度 inbox 同步。 ## 数据流 @@ -353,6 +406,7 @@ discussions ──┬─► participants | **HTTP/SSE MCP** | `python -m roundtable.mcp --http --port` | 跨主机、远程 agent | | **In-process (Hermes)** | `from roundtable.adapters.hermes import…` | 嵌入到 Hermes 主程序 | | **HTTP Bridge (Codex)** | `python -m roundtable.codex` (port 8201) | OpenAI Codex CLI 等非 MCP 平台 | +| **Agent Daemon** | `python -m roundtable.agent --agent-id ... --platform ...` | stdio/轮询型 agent runtime | ## 测试覆盖率 (v0.1.0a1) diff --git a/src/roundtable/__init__.py b/src/roundtable/__init__.py index d03716d..4f27d2a 100644 --- a/src/roundtable/__init__.py +++ b/src/roundtable/__init__.py @@ -44,6 +44,7 @@ Speech, ) from roundtable.notify import Notifier +from roundtable.orchestrator import FederatedOrchestrator, ManagedOrchestrator from roundtable.web_publisher import WebPublisher from roundtable.webhook import WebhookSender @@ -89,11 +90,13 @@ def list_adapters() -> dict[str, type[RoundtableAdapter]]: "Discussion", "DiscussionNotActiveError", "DiscussionNotFoundError", + "FederatedOrchestrator", "Finding", "InvalidFindingTypeError", "InvalidParticipantError", "InvalidReplyToError", "InvalidSpeechOrderError", + "ManagedOrchestrator", "Notifier", "Participant", "Roundtable", diff --git a/src/roundtable/agent.py b/src/roundtable/agent.py new file mode 100644 index 0000000..60a0968 --- /dev/null +++ b/src/roundtable/agent.py @@ -0,0 +1,177 @@ +"""Runtime daemon for registered Roundtable agents. + +The daemon is intentionally small: it registers an agent, refreshes heartbeat +state, polls inbox messages, and accepts summons/invitations according to the +configured policy. Speech generation remains the responsibility of the host app. +""" + +from __future__ import annotations + +import argparse +import logging +import time +from typing import Any + +from roundtable.core import RoundtableCore +from roundtable.db import RoundtableDB +from roundtable.mcp.tools import handle_tool_call + +logger = logging.getLogger(__name__) + + +class AgentDaemon: + """Register and keep a non-HTTP agent available for summons.""" + + def __init__( + self, + *, + agent_id: str, + platform: str, + db: RoundtableDB | None = None, + display_name: str | None = None, + skills: list[str] | None = None, + capabilities: list[str] | None = None, + availability: str = "idle", + accept_policy: str = "auto", + transport: str = "stdio", + endpoint: str | None = None, + poll_interval: float = 2.0, + ): + self.agent_id = agent_id + self.platform = platform + self.db = db or RoundtableDB() + self.core = RoundtableCore(db=self.db) + self.display_name = display_name or agent_id + self.skills = skills or ["agent-roundtable"] + self.capabilities = capabilities or ["speak", "listen"] + self.availability = availability + self.accept_policy = accept_policy + self.transport = transport + self.endpoint = endpoint + self.poll_interval = max(0.1, float(poll_interval)) + + def register(self) -> dict[str, Any]: + return handle_tool_call( + self.core, + self.db, + "roundtable_register_agent", + { + "agent_id": self.agent_id, + "platform": self.platform, + "display_name": self.display_name, + "capabilities": self.capabilities, + "transport": self.transport, + "endpoint": self.endpoint, + "skills": self.skills, + "availability": self.availability, + "accept_policy": self.accept_policy, + }, + ) + + def heartbeat(self) -> dict[str, Any]: + return handle_tool_call( + self.core, + self.db, + "roundtable_heartbeat", + { + "agent_id": self.agent_id, + "availability": self.availability, + "metadata": {"skills": self.skills, "accept_policy": self.accept_policy}, + }, + ) + + def tick(self) -> dict[str, Any]: + """Run one deterministic daemon cycle.""" + self.heartbeat() + inbox = handle_tool_call( + self.core, + self.db, + "roundtable_inbox", + {"agent_id": self.agent_id, "unread_only": True, "mark_read": True}, + ) + messages = inbox.get("messages", []) + handled = [self._handle_message(message) for message in messages] + return {"agent_id": self.agent_id, "messages": messages, "handled": handled} + + def run_forever(self) -> None: + self.register() + logger.info("Roundtable agent daemon registered: %s", self.agent_id) + while True: + self.tick() + time.sleep(self.poll_interval) + + def _handle_message(self, message: dict[str, Any]) -> dict[str, Any]: + payload = message.get("payload") or {} + msg_type = message.get("type") + if self.accept_policy != "auto": + return {"message_id": message.get("id"), "status": "ignored", "reason": "manual_accept_required"} + + if msg_type == "summon" and payload.get("discussion_id"): + result = handle_tool_call( + self.core, + self.db, + "roundtable_accept_summon", + { + "discussion_id": payload["discussion_id"], + "agent_id": self.agent_id, + "metadata": {"source": "agent_daemon", "summon_id": payload.get("summon_id")}, + }, + ) + return {"message_id": message.get("id"), "type": msg_type, "result": result} + + if msg_type == "invitation" and payload.get("discussion_id"): + result = handle_tool_call( + self.core, + self.db, + "roundtable_accept_invite", + {"discussion_id": payload["discussion_id"], "agent_id": self.agent_id}, + ) + return {"message_id": message.get("id"), "type": msg_type, "result": result} + + return {"message_id": message.get("id"), "status": "ignored", "type": msg_type} + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Roundtable agent daemon") + parser.add_argument("--agent-id", required=True, help="Unique agent identifier") + parser.add_argument("--platform", required=True, help="Platform name, e.g. codex or claude-code") + parser.add_argument("--db", type=str, default=None, help="SQLite database path") + parser.add_argument("--display-name", default=None, help="Human-readable agent name") + parser.add_argument("--skill", action="append", dest="skills", help="Installed skill; can be repeated") + parser.add_argument("--availability", default="idle", help="Runtime availability") + parser.add_argument("--accept-policy", default="auto", choices=["auto", "manual", "never"], help="Summon policy") + parser.add_argument("--transport", default="stdio", choices=["stdio", "http"], help="Agent transport") + parser.add_argument("--endpoint", default=None, help="HTTP endpoint when transport=http") + parser.add_argument("--poll-interval", type=float, default=2.0, help="Inbox polling interval in seconds") + parser.add_argument("--once", action="store_true", help="Run one heartbeat/inbox cycle and exit") + parser.add_argument("--log-level", default="INFO", help="Logging level") + return parser + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + logging.basicConfig(level=getattr(logging, args.log_level.upper(), logging.INFO)) + + daemon = AgentDaemon( + agent_id=args.agent_id, + platform=args.platform, + db=RoundtableDB(args.db), + display_name=args.display_name, + skills=args.skills or ["agent-roundtable"], + availability=args.availability, + accept_policy=args.accept_policy, + transport=args.transport, + endpoint=args.endpoint, + poll_interval=args.poll_interval, + ) + registration = daemon.register() + if args.once: + result = daemon.tick() + print({"registration": registration, "tick": result}) + return + daemon.run_forever() + + +if __name__ == "__main__": + main() diff --git a/src/roundtable/codex.py b/src/roundtable/codex.py index 2b22745..3bcab87 100644 --- a/src/roundtable/codex.py +++ b/src/roundtable/codex.py @@ -15,6 +15,7 @@ def build_parser() -> argparse.ArgumentParser: parser.add_argument("--host", default="127.0.0.1", help="HTTP host (default: 127.0.0.1)") parser.add_argument("--display-name", default="Codex Agent", help="Human-readable agent name") parser.add_argument("--db", type=str, default=None, help="SQLite database path") + parser.add_argument("--auth-token", default=None, help="Optional bearer token required by bridge endpoints") parser.add_argument("--log-level", default="INFO", help="Logging level") return parser @@ -31,6 +32,7 @@ def main() -> None: host=args.host, display_name=args.display_name, db_path=args.db, + auth_token=args.auth_token, ) bridge.start() diff --git a/src/roundtable/core.py b/src/roundtable/core.py index eed4faa..05057fe 100644 --- a/src/roundtable/core.py +++ b/src/roundtable/core.py @@ -81,6 +81,7 @@ def create_discussion( created_by: str = "coordinator", output_path: str | None = None, notifications: dict[str, Any] | None = None, + status: str = "active", web: bool = True, web_port: int = 8199, expires_at: float | None = None, @@ -114,9 +115,11 @@ def create_discussion( speech_order = tpl["speech_order"] if not topic or not topic.strip(): raise ValueError("topic is required") - if not participants or not isinstance(participants, list): + if participants is None or not isinstance(participants, list): raise ValueError("participants must be a non-empty array of objects") - if len(participants) < 2: + if status != "assembling" and not participants: + raise ValueError("participants must be a non-empty array of objects") + if status != "assembling" and len(participants) < 2: raise ValueError("At least 2 participants are required for a discussion") if notifications is not None: @@ -141,6 +144,7 @@ def create_discussion( created_by=created_by, output_path=output_path, notifications=notifications, + status=status, ) # Optionally start web viewer @@ -165,6 +169,7 @@ def create_discussion( } for p in participants ], + status=disc.status, expires_at=expires_at, ) except Exception as exc: @@ -475,6 +480,24 @@ def status(self, discussion_id: str) -> dict[str, Any]: if not disc: raise DiscussionNotFoundError(f"Discussion {discussion_id} not found") + dispatches = self.db.list_dispatches(conn, discussion_id=discussion_id) + dispatch_status = [] + for dispatch in dispatches: + readiness_result = self.db.apply_dispatch_readiness(conn, dispatch["id"]) + updated_dispatch = readiness_result.get("dispatch") or dispatch + dispatch_status.append( + { + "dispatch": updated_dispatch, + "readiness": readiness_result.get("readiness"), + "summons": self.db.get_summons(conn, dispatch_id=dispatch["id"]), + "discussion_activated": readiness_result.get("discussion_activated", False), + } + ) + if any(item.get("discussion_activated") for item in dispatch_status): + refreshed = self.db.get_discussion(conn, discussion_id) + if refreshed: + disc = refreshed + participants = self.db.get_participants(conn, discussion_id) speech_count = self.db.get_speech_count(conn, discussion_id) findings = self.db.get_findings(conn, discussion_id) @@ -513,6 +536,7 @@ def status(self, discussion_id: str) -> dict[str, Any]: "speech_count": speech_count, "participant_count": len(participants), "next_speaker": next_speaker, + "dispatches": dispatch_status, "convergence_history": [ { "round": c.round, @@ -658,6 +682,16 @@ def end_discussion( if not disc: raise DiscussionNotFoundError(f"Discussion {discussion_id} not found") if disc.status != "active": + if force and disc.status == "assembling": + ok = self.db.cancel_discussion(conn, discussion_id) + self._emit("discussion_ended", discussion_id=discussion_id, action="cancelled") + return { + "ok": True, + "discussion_id": discussion_id, + "action": "cancelled", + "success": ok, + "web_retained": False, + } if disc.status == "concluded" and not force and conclusion is not None: conn.execute( "UPDATE discussions SET conclusion = ? WHERE id = ?", @@ -671,7 +705,7 @@ def end_discussion( self._build_output_markdown(conn, disc_after, conclusion_override=conclusion), ) - publisher = self._publishers.get(discussion_id) + publisher = self._publishers.pop(discussion_id, None) web_retained = False if publisher: try: @@ -686,6 +720,7 @@ def end_discussion( ) publisher.conclude(conclusion) web_retained = True + logger.info("Web publisher retained for concluded discussion %s", discussion_id) except (OSError, RuntimeError, ValueError): logger.exception("Web publisher conclude update failed for %s", discussion_id) else: @@ -729,7 +764,7 @@ def end_discussion( # Web publisher hook. A concluded viewer remains online for post-meeting # review; force-cancel still stops it immediately. - publisher = self._publishers.get(discussion_id) + publisher = self._publishers.pop(discussion_id, None) web_retained = False if publisher: try: @@ -744,12 +779,8 @@ def end_discussion( ) publisher.conclude(disc_after.conclusion or "") web_retained = True - # Disk-backed viewer keeps serving via shared PM2 server; - # release the in-memory publisher to avoid leaking. - self._publishers.pop(discussion_id, None) logger.info("Web publisher retained for concluded discussion %s", discussion_id) else: - self._publishers.pop(discussion_id, None) publisher.stop() logger.info("Web publisher stopped for %s", discussion_id) except (OSError, RuntimeError, ValueError): diff --git a/src/roundtable/db.py b/src/roundtable/db.py index 0f78006..8e86cfa 100644 --- a/src/roundtable/db.py +++ b/src/roundtable/db.py @@ -31,12 +31,16 @@ ) from roundtable.schema import ( SCHEMA_SQL, + VALID_DISCUSSION_STATUSES, + VALID_DISPATCH_MODES, + VALID_DISPATCH_STATUSES, VALID_FINDING_TYPES, VALID_SPEECH_ORDERS, + VALID_SUMMON_STATUSES, migrate_db, ) -# Constants and SCHEMA_SQL are imported from roundtable.schema +DEFAULT_HEARTBEAT_TIMEOUT_SECONDS = 90 # --------------------------------------------------------------------------- @@ -104,12 +108,15 @@ def create_discussion( created_by: str = "unknown", output_path: str | None = None, notifications: dict[str, Any] | None = None, + status: str = "active", ) -> Discussion: + if status not in VALID_DISCUSSION_STATUSES: + raise ValueError(f"Invalid discussion status: {status}") if speech_order not in VALID_SPEECH_ORDERS: raise InvalidSpeechOrderError(f"Invalid speech_order: {speech_order}") if max_rounds < 1: raise ValueError("max_rounds must be >= 1") - if not participants: + if not participants and status != "assembling": raise ValueError("At least one participant is required") seen_profiles: set[str] = set() for p in participants: @@ -130,8 +137,8 @@ def create_discussion( """INSERT INTO discussions (id, topic, context, status, max_rounds, current_round, speech_order, created_by, created_at, output_path, notifications) - VALUES (?, ?, ?, 'active', ?, 0, ?, ?, ?, ?, ?)""", - (disc_id, topic, context, max_rounds, speech_order, created_by, now, output_path, notif_json), + VALUES (?, ?, ?, ?, ?, 0, ?, ?, ?, ?, ?)""", + (disc_id, topic, context, status, max_rounds, speech_order, created_by, now, output_path, notif_json), ) for p in participants: profile = p.get("profile", "").strip() @@ -151,7 +158,7 @@ def create_discussion( id=disc_id, topic=topic, context=context, - status="active", + status=status, max_rounds=max_rounds, current_round=0, speech_order=speech_order, @@ -170,6 +177,14 @@ def get_discussion(self, conn: sqlite3.Connection, discussion_id: str) -> Discus return None return self._row_to_discussion(row) + def activate_discussion(self, conn: sqlite3.Connection, discussion_id: str) -> bool: + """Move an assembling discussion into the active speaking lifecycle.""" + cur = conn.execute( + "UPDATE discussions SET status = 'active' WHERE id = ? AND status = 'assembling'", + (discussion_id,), + ) + return cur.rowcount > 0 + def list_discussions( self, conn: sqlite3.Connection, @@ -213,7 +228,7 @@ def cancel_discussion(self, conn: sqlite3.Connection, discussion_id: str) -> boo cur = conn.execute( """UPDATE discussions SET status = 'cancelled', concluded_at = ? - WHERE id = ? AND status = 'active'""", + WHERE id = ? AND status IN ('assembling', 'active')""", (now, discussion_id), ) return cur.rowcount > 0 @@ -222,6 +237,25 @@ def cancel_discussion(self, conn: sqlite3.Connection, discussion_id: str) -> boo # Participants # ------------------------------------------------------------------ + def add_participant( + self, + conn: sqlite3.Connection, + discussion_id: str, + participant: str, + *, + role: str | None = None, + perspective: str | None = None, + display_name: str | None = None, + ) -> bool: + now = int(time.time()) + cur = conn.execute( + """INSERT OR IGNORE INTO participants + (discussion_id, participant, role, perspective, display_name, joined_at, is_active) + VALUES (?, ?, ?, ?, ?, ?, 1)""", + (discussion_id, participant, role, perspective, display_name or participant, now), + ) + return cur.rowcount > 0 + def get_participants(self, conn: sqlite3.Connection, discussion_id: str) -> list[Participant]: rows = conn.execute( "SELECT * FROM participants WHERE discussion_id = ? ORDER BY joined_at", @@ -626,6 +660,86 @@ def calculate_convergence(self, conn: sqlite3.Connection, discussion_id: str, ro # Helpers # ------------------------------------------------------------------ + @staticmethod + def _loads_json(raw: Any, default: Any = None) -> Any: + if raw is None or raw == "": + return default + if isinstance(raw, (dict, list)): + return raw + try: + return json.loads(raw) + except (TypeError, json.JSONDecodeError): + return default + + @classmethod + def _normalize_agent_metadata(cls, metadata: dict[str, Any] | None) -> dict[str, Any]: + if not metadata: + return {} + normalized = dict(metadata) + skills = normalized.get("skills") + if skills is None: + normalized["skills"] = [] + elif isinstance(skills, str): + normalized["skills"] = [skills] + else: + normalized["skills"] = [str(skill) for skill in skills if skill] + skill_versions = normalized.get("skill_versions") + if not isinstance(skill_versions, dict): + normalized["skill_versions"] = {} + roles = normalized.get("roles") + if roles is None: + normalized["roles"] = [] + elif isinstance(roles, str): + normalized["roles"] = [roles] + else: + normalized["roles"] = [str(role) for role in roles if role] + if "availability" in normalized and normalized["availability"] is not None: + normalized["availability"] = str(normalized["availability"]) + if "accept_policy" in normalized and normalized["accept_policy"] is not None: + normalized["accept_policy"] = str(normalized["accept_policy"]) + return normalized + + @classmethod + def _merge_agent_metadata( + cls, + existing: dict[str, Any] | None, + incoming: dict[str, Any] | None, + ) -> dict[str, Any]: + merged = cls._normalize_agent_metadata(existing) + if not incoming: + return merged + normalized_incoming = cls._normalize_agent_metadata(incoming) + for key, value in normalized_incoming.items(): + if key == "skills": + merged[key] = sorted(set(cls._agent_skills(merged)) | set(cls._agent_skills(normalized_incoming))) + elif key == "skill_versions": + versions = dict(merged.get("skill_versions") or {}) + versions.update(value or {}) + merged[key] = versions + elif key == "roles": + roles = set(merged.get("roles") or []) + roles.update(value or []) + merged[key] = sorted(roles) + else: + merged[key] = value + return merged + + @staticmethod + def _agent_skills(metadata: dict[str, Any] | None) -> list[str]: + if not metadata: + return [] + skills = metadata.get("skills", []) + if isinstance(skills, str): + return [skills] + return [str(skill) for skill in skills if skill] + + @staticmethod + def _public_agent_metadata(metadata: dict[str, Any] | None) -> dict[str, Any]: + public = dict(metadata or {}) + for key in ("auth_token", "bridge_auth_token", "_bridge_auth_token", "roundtable_auth_token"): + public.pop(key, None) + return public + @staticmethod def _row_to_discussion(row: sqlite3.Row) -> Discussion: notif_raw = row["notifications"] @@ -650,6 +764,48 @@ def _row_to_discussion(row: sqlite3.Row) -> Discussion: notifications=notif, ) + @classmethod + def _row_to_dispatch(cls, row: sqlite3.Row) -> dict[str, Any]: + return { + "id": row["id"], + "discussion_id": row["discussion_id"], + "mode": row["mode"], + "coordinator_agent_id": row["coordinator_agent_id"], + "start_policy": row["start_policy"], + "min_accepts": row["min_accepts"], + "timeout_seconds": row["timeout_seconds"], + "status": row["status"], + "idempotency_key": row["idempotency_key"], + "created_at": row["created_at"], + "updated_at": row["updated_at"], + "started_at": row["started_at"], + "completed_at": row["completed_at"], + "metadata": cls._loads_json(row["metadata"], default={}), + } + + @classmethod + def _row_to_summon(cls, row: sqlite3.Row) -> dict[str, Any]: + return { + "id": row["id"], + "dispatch_id": row["dispatch_id"], + "discussion_id": row["discussion_id"], + "agent_id": row["agent_id"], + "role": row["role"], + "perspective": row["perspective"], + "required_skill": row["required_skill"], + "status": row["status"], + "invited_by": row["invited_by"], + "transport": row["transport"], + "endpoint": row["endpoint"], + "delivery_result": cls._loads_json(row["delivery_result"], default={}), + "idempotency_key": row["idempotency_key"], + "created_at": row["created_at"], + "delivered_at": row["delivered_at"], + "responded_at": row["responded_at"], + "expires_at": row["expires_at"], + "metadata": cls._loads_json(row["metadata"], default={}), + } + # ------------------------------------------------------------------ # Agents (MCP multi-agent support) # ------------------------------------------------------------------ @@ -668,6 +824,11 @@ def upsert_agent( metadata: dict[str, Any] | None = None, ) -> dict[str, Any]: now = int(time.time()) + existing_agent = self.get_agent(conn, agent_id, include_private=True) + if existing_agent: + normalized_metadata = self._merge_agent_metadata(existing_agent.get("metadata"), metadata) + else: + normalized_metadata = self._normalize_agent_metadata(metadata) conn.execute( """INSERT INTO agents (agent_id, platform, display_name, persona, capabilities, transport, endpoint, last_seen, metadata) @@ -690,21 +851,59 @@ def upsert_agent( transport, endpoint, now, - json.dumps(metadata) if metadata else None, + json.dumps(normalized_metadata) if normalized_metadata else None, ), ) return {"agent_id": agent_id, "last_seen": now} - def touch_agent(self, conn: sqlite3.Connection, agent_id: str) -> None: + def touch_agent( + self, + conn: sqlite3.Connection, + agent_id: str, + *, + availability: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> None: now = int(time.time()) - conn.execute("UPDATE agents SET last_seen = ? WHERE agent_id = ?", (now, agent_id)) + agent = self.get_agent(conn, agent_id, include_private=True) + if not agent: + return + merged_metadata = self._merge_agent_metadata(agent.get("metadata"), metadata) + if availability: + merged_metadata["availability"] = availability + merged_metadata["last_heartbeat"] = now + conn.execute( + "UPDATE agents SET last_seen = ?, metadata = ? WHERE agent_id = ?", + (now, json.dumps(merged_metadata), agent_id), + ) + + def heartbeat_agent( + self, + conn: sqlite3.Connection, + agent_id: str, + *, + availability: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> dict[str, Any]: + self.touch_agent(conn, agent_id, availability=availability, metadata=metadata) + agent = self.get_agent(conn, agent_id) + if not agent: + return {"error": f"Agent {agent_id} is not registered"} + return { + "agent_id": agent_id, + "last_seen": agent["last_seen"], + "online": agent["online"], + "metadata": agent.get("metadata"), + } def list_agents( self, conn: sqlite3.Connection, *, online_only: bool = False, - timeout_seconds: int = 90, + timeout_seconds: int = DEFAULT_HEARTBEAT_TIMEOUT_SECONDS, + required_skill: str | None = None, + availability: str | None = None, ) -> list[dict[str, Any]]: if online_only: cutoff = int(time.time()) - timeout_seconds @@ -717,38 +916,60 @@ def list_agents( results = [] now = int(time.time()) for r in rows: + metadata = self._loads_json(r["metadata"], default={}) + public_metadata = self._public_agent_metadata(metadata) + skills = self._agent_skills(metadata) + if required_skill and required_skill not in skills: + continue + if availability and metadata.get("availability") != availability: + continue results.append( { "agent_id": r["agent_id"], "platform": r["platform"], "display_name": r["display_name"], - "persona": json.loads(r["persona"]) if r["persona"] else None, - "capabilities": json.loads(r["capabilities"]) if r["capabilities"] else None, + "persona": self._loads_json(r["persona"]), + "capabilities": self._loads_json(r["capabilities"]), "transport": r["transport"], "endpoint": r["endpoint"], "last_seen": r["last_seen"], "online": (now - r["last_seen"]) < timeout_seconds, - "metadata": json.loads(r["metadata"]) if r["metadata"] else None, + "metadata": public_metadata, + "skills": skills, + "availability": metadata.get("availability"), + "accept_policy": metadata.get("accept_policy"), } ) return results - def get_agent(self, conn: sqlite3.Connection, agent_id: str) -> dict[str, Any] | None: + def get_agent( + self, + conn: sqlite3.Connection, + agent_id: str, + *, + include_private: bool = False, + timeout_seconds: int = DEFAULT_HEARTBEAT_TIMEOUT_SECONDS, + ) -> dict[str, Any] | None: row = conn.execute("SELECT * FROM agents WHERE agent_id = ?", (agent_id,)).fetchone() if not row: return None now = int(time.time()) + metadata = self._loads_json(row["metadata"], default={}) + result_metadata = metadata if include_private else self._public_agent_metadata(metadata) return { "agent_id": row["agent_id"], "platform": row["platform"], "display_name": row["display_name"], - "persona": json.loads(row["persona"]) if row["persona"] else None, - "capabilities": json.loads(row["capabilities"]) if row["capabilities"] else None, + "persona": self._loads_json(row["persona"]), + "capabilities": self._loads_json(row["capabilities"]), "transport": row["transport"], "endpoint": row["endpoint"], "last_seen": row["last_seen"], - "online": (now - row["last_seen"]) < 90, - "metadata": json.loads(row["metadata"]) if row["metadata"] else None, + "online": (now - row["last_seen"]) < timeout_seconds, + "metadata": result_metadata, + "skills": self._agent_skills(metadata), + "availability": metadata.get("availability"), + "accept_policy": metadata.get("accept_policy"), } # ------------------------------------------------------------------ @@ -899,3 +1120,701 @@ def get_invitations( } for r in rows ] + + # ------------------------------------------------------------------ + # Dispatches / Summons + # ------------------------------------------------------------------ + + def create_dispatch( + self, + conn: sqlite3.Connection, + discussion_id: str, + coordinator_agent_id: str, + *, + mode: str = "federated", + start_policy: str = "quorum", + min_accepts: int = 1, + timeout_seconds: int = 60, + idempotency_key: str | None = None, + allow_terminal_retry: bool = False, + metadata: dict[str, Any] | None = None, + ) -> dict[str, Any]: + if mode not in VALID_DISPATCH_MODES: + raise ValueError(f"Invalid dispatch mode: {mode}") + if start_policy not in {"immediate", "quorum", "all", "timeout"}: + raise ValueError(f"Invalid start_policy: {start_policy}") + if not self.get_discussion(conn, discussion_id): + raise DiscussionNotFoundError(f"Discussion {discussion_id} not found") + if idempotency_key: + existing = self.get_dispatch(conn, idempotency_key=idempotency_key) + if existing: + if allow_terminal_retry and existing["status"] in {"completed", "cancelled", "timeout"}: + released_key = f"{idempotency_key}#released:{existing['id']}" + conn.execute( + "UPDATE dispatches SET idempotency_key = ? WHERE id = ? AND idempotency_key = ?", + (released_key, existing["id"], idempotency_key), + ) + self.record_summon_event( + conn, + summon_id=None, + dispatch_id=existing["id"], + agent_id=existing.get("coordinator_agent_id"), + event="dispatch.idempotency_key.released", + payload={ + "idempotency_key": idempotency_key, + "released_key": released_key, + "previous_status": existing["status"], + }, + ) + else: + return existing + + now = int(time.time()) + dispatch_id = f"dp_{secrets.token_hex(6)}" + conn.execute( + """INSERT INTO dispatches + (id, discussion_id, mode, coordinator_agent_id, start_policy, + min_accepts, timeout_seconds, status, idempotency_key, + created_at, updated_at, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?, ?, ?)""", + ( + dispatch_id, + discussion_id, + mode, + coordinator_agent_id, + start_policy, + max(0, int(min_accepts)), + max(0, int(timeout_seconds)), + idempotency_key, + now, + now, + json.dumps(metadata) if metadata else None, + ), + ) + self.record_summon_event( + conn, + summon_id=None, + dispatch_id=dispatch_id, + agent_id=coordinator_agent_id, + event="dispatch.created", + payload={"mode": mode, "start_policy": start_policy, "min_accepts": min_accepts}, + ) + dispatch = self.get_dispatch(conn, dispatch_id) + if not dispatch: + raise RuntimeError("Failed to create dispatch") + return dispatch + + def get_dispatch( + self, + conn: sqlite3.Connection, + dispatch_id: str | None = None, + *, + idempotency_key: str | None = None, + ) -> dict[str, Any] | None: + if dispatch_id: + row = conn.execute("SELECT * FROM dispatches WHERE id = ?", (dispatch_id,)).fetchone() + elif idempotency_key: + row = conn.execute( + "SELECT * FROM dispatches WHERE idempotency_key = ?", + (idempotency_key,), + ).fetchone() + else: + raise ValueError("dispatch_id or idempotency_key is required") + return self._row_to_dispatch(row) if row else None + + def list_dispatches( + self, + conn: sqlite3.Connection, + *, + discussion_id: str | None = None, + status: str | None = None, + ) -> list[dict[str, Any]]: + query = "SELECT * FROM dispatches WHERE 1=1" + params: list[Any] = [] + if discussion_id: + query += " AND discussion_id = ?" + params.append(discussion_id) + if status: + if status not in VALID_DISPATCH_STATUSES: + raise ValueError(f"Invalid dispatch status: {status}") + query += " AND status = ?" + params.append(status) + query += " ORDER BY created_at DESC" + rows = conn.execute(query, params).fetchall() + return [self._row_to_dispatch(row) for row in rows] + + def update_dispatch_status( + self, + conn: sqlite3.Connection, + dispatch_id: str, + status: str, + *, + started_at: int | None = None, + completed_at: int | None = None, + ) -> dict[str, Any] | None: + if status not in VALID_DISPATCH_STATUSES: + raise ValueError(f"Invalid dispatch status: {status}") + now = int(time.time()) + conn.execute( + """UPDATE dispatches + SET status = ?, + updated_at = ?, + started_at = COALESCE(?, started_at), + completed_at = COALESCE(?, completed_at) + WHERE id = ?""", + (status, now, started_at, completed_at, dispatch_id), + ) + return self.get_dispatch(conn, dispatch_id) + + def reopen_dispatch_for_retry( + self, + conn: sqlite3.Connection, + dispatch_id: str, + *, + retry_timeout_seconds: int = 60, + ) -> dict[str, Any] | None: + dispatch = self.get_dispatch(conn, dispatch_id) + if not dispatch: + return None + if dispatch["status"] in {"completed", "cancelled"}: + return dispatch + + now = int(time.time()) + retry_timeout_seconds = max(0, int(retry_timeout_seconds)) + elapsed = max(0, now - int(dispatch["created_at"])) + timeout_seconds = max(int(dispatch.get("timeout_seconds") or 0), elapsed + retry_timeout_seconds) + conn.execute( + """UPDATE dispatches + SET status = CASE WHEN status = 'timeout' THEN 'pending' ELSE status END, + timeout_seconds = ?, + updated_at = ?, + completed_at = CASE WHEN status = 'timeout' THEN NULL ELSE completed_at END + WHERE id = ?""", + (timeout_seconds, now, dispatch_id), + ) + self.record_summon_event( + conn, + summon_id=None, + dispatch_id=dispatch_id, + agent_id=dispatch.get("coordinator_agent_id"), + event="dispatch.retry", + payload={"retry_timeout_seconds": retry_timeout_seconds}, + ) + return self.get_dispatch(conn, dispatch_id) + + def create_summon( + self, + conn: sqlite3.Connection, + discussion_id: str, + agent_id: str, + invited_by: str, + *, + dispatch_id: str | None = None, + role: str | None = None, + perspective: str | None = None, + required_skill: str | None = None, + transport: str | None = None, + endpoint: str | None = None, + expires_at: int | None = None, + idempotency_key: str | None = None, + allow_terminal_retry: bool = False, + metadata: dict[str, Any] | None = None, + ) -> dict[str, Any]: + if not self.get_discussion(conn, discussion_id): + raise DiscussionNotFoundError(f"Discussion {discussion_id} not found") + agent = self.get_agent(conn, agent_id) + if not agent: + raise ValueError(f"Agent {agent_id} is not registered") + if required_skill and required_skill not in agent.get("skills", []): + raise ValueError(f"Agent {agent_id} does not provide required skill: {required_skill}") + if dispatch_id and not self.get_dispatch(conn, dispatch_id): + raise ValueError(f"Dispatch {dispatch_id} not found") + if idempotency_key: + existing = self.get_summon(conn, idempotency_key=idempotency_key) + if existing: + reused = self._maybe_reuse_summon_for_retry( + conn, + existing, + dispatch_id=dispatch_id, + invited_by=invited_by, + role=role, + perspective=perspective, + required_skill=required_skill, + transport=transport or agent.get("transport"), + endpoint=endpoint or agent.get("endpoint"), + expires_at=expires_at, + idempotency_key=idempotency_key, + metadata=metadata, + allow_terminal_retry=allow_terminal_retry, + ) + if reused: + return reused + return existing + existing = self.get_summon(conn, discussion_id=discussion_id, agent_id=agent_id) + if existing: + reused = self._maybe_reuse_summon_for_retry( + conn, + existing, + dispatch_id=dispatch_id, + invited_by=invited_by, + role=role, + perspective=perspective, + required_skill=required_skill, + transport=transport or agent.get("transport"), + endpoint=endpoint or agent.get("endpoint"), + expires_at=expires_at, + idempotency_key=idempotency_key, + metadata=metadata, + allow_terminal_retry=allow_terminal_retry, + ) + if reused: + return reused + return existing + + now = int(time.time()) + if expires_at is None and dispatch_id: + dispatch = self.get_dispatch(conn, dispatch_id) + if dispatch: + expires_at = dispatch["created_at"] + int(dispatch["timeout_seconds"]) + summon_id = f"sm_{secrets.token_hex(6)}" + conn.execute( + """INSERT INTO summons + (id, dispatch_id, discussion_id, agent_id, role, perspective, + required_skill, status, invited_by, transport, endpoint, + idempotency_key, created_at, expires_at, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?, ?, ?)""", + ( + summon_id, + dispatch_id, + discussion_id, + agent_id, + role, + perspective, + required_skill, + invited_by, + transport or agent.get("transport"), + endpoint or agent.get("endpoint"), + idempotency_key, + now, + expires_at, + json.dumps(metadata) if metadata else None, + ), + ) + self.record_summon_event( + conn, + summon_id=summon_id, + dispatch_id=dispatch_id, + agent_id=agent_id, + event="summon.created", + payload={"discussion_id": discussion_id, "required_skill": required_skill}, + ) + summon = self.get_summon(conn, summon_id) + if not summon: + raise RuntimeError("Failed to create summon") + return summon + + def _maybe_reuse_summon_for_retry( + self, + conn: sqlite3.Connection, + existing: dict[str, Any], + *, + dispatch_id: str | None, + invited_by: str, + role: str | None, + perspective: str | None, + required_skill: str | None, + transport: str | None, + endpoint: str | None, + expires_at: int | None, + idempotency_key: str | None, + metadata: dict[str, Any] | None, + allow_terminal_retry: bool, + ) -> dict[str, Any] | None: + if not allow_terminal_retry or existing["status"] == "accepted": + return None + + previous_dispatch = self.get_dispatch(conn, existing["dispatch_id"]) if existing.get("dispatch_id") else None + previous_dispatch_terminal = bool( + previous_dispatch and previous_dispatch["status"] in {"completed", "cancelled", "timeout"} + ) + summon_retryable = existing["status"] in {"declined", "timeout", "failed"} + if not summon_retryable and not previous_dispatch_terminal: + return None + + now = int(time.time()) + conn.execute( + """UPDATE summons + SET dispatch_id = ?, + status = 'pending', + invited_by = ?, + role = ?, + perspective = ?, + required_skill = ?, + transport = ?, + endpoint = ?, + delivery_result = NULL, + delivered_at = NULL, + responded_at = NULL, + expires_at = COALESCE(?, expires_at), + idempotency_key = COALESCE(?, idempotency_key), + metadata = COALESCE(?, metadata) + WHERE id = ?""", + ( + dispatch_id, + invited_by, + role, + perspective, + required_skill, + transport, + endpoint, + expires_at, + idempotency_key, + json.dumps(metadata) if metadata else None, + existing["id"], + ), + ) + self.record_summon_event( + conn, + summon_id=existing["id"], + dispatch_id=dispatch_id, + agent_id=existing.get("agent_id"), + event="summon.reused_for_retry", + payload={ + "previous_dispatch_id": existing.get("dispatch_id"), + "previous_status": existing.get("status"), + "retry_at": now, + }, + ) + return self.get_summon(conn, existing["id"]) + + def get_summon( + self, + conn: sqlite3.Connection, + summon_id: str | None = None, + *, + discussion_id: str | None = None, + agent_id: str | None = None, + idempotency_key: str | None = None, + ) -> dict[str, Any] | None: + if summon_id: + row = conn.execute("SELECT * FROM summons WHERE id = ?", (summon_id,)).fetchone() + elif idempotency_key: + row = conn.execute( + "SELECT * FROM summons WHERE idempotency_key = ?", + (idempotency_key,), + ).fetchone() + elif discussion_id and agent_id: + row = conn.execute( + "SELECT * FROM summons WHERE discussion_id = ? AND agent_id = ?", + (discussion_id, agent_id), + ).fetchone() + else: + raise ValueError("summon_id, idempotency_key, or discussion_id+agent_id is required") + return self._row_to_summon(row) if row else None + + def get_summons( + self, + conn: sqlite3.Connection, + *, + agent_id: str | None = None, + discussion_id: str | None = None, + dispatch_id: str | None = None, + status: str | None = None, + ) -> list[dict[str, Any]]: + query = "SELECT * FROM summons WHERE 1=1" + params: list[Any] = [] + if agent_id: + query += " AND agent_id = ?" + params.append(agent_id) + if discussion_id: + query += " AND discussion_id = ?" + params.append(discussion_id) + if dispatch_id: + query += " AND dispatch_id = ?" + params.append(dispatch_id) + if status: + if status not in VALID_SUMMON_STATUSES: + raise ValueError(f"Invalid summon status: {status}") + query += " AND status = ?" + params.append(status) + query += " ORDER BY created_at DESC" + rows = conn.execute(query, params).fetchall() + return [self._row_to_summon(row) for row in rows] + + def mark_summon_delivered( + self, + conn: sqlite3.Connection, + summon_id: str, + delivery_result: dict[str, Any], + *, + transport: str | None = None, + endpoint: str | None = None, + ) -> dict[str, Any] | None: + now = int(time.time()) + current = self.get_summon(conn, summon_id) + if not current: + return None + ok = delivery_result.get("ok", True) is not False + new_status = "delivered" if ok else "failed" + conn.execute( + """UPDATE summons + SET status = ?, + delivered_at = ?, + delivery_result = ?, + transport = COALESCE(?, transport), + endpoint = COALESCE(?, endpoint) + WHERE id = ? AND status IN ('pending', 'delivered', 'failed')""", + (new_status, now, json.dumps(delivery_result), transport, endpoint, summon_id), + ) + self.record_summon_event( + conn, + summon_id=summon_id, + dispatch_id=current.get("dispatch_id"), + agent_id=current.get("agent_id"), + event="summon.delivered" if ok else "summon.failed", + payload=delivery_result, + ) + return self.get_summon(conn, summon_id) + + def reset_summon_for_retry( + self, + conn: sqlite3.Connection, + summon_id: str, + *, + expires_at: int | None = None, + payload: dict[str, Any] | None = None, + ) -> dict[str, Any] | None: + current = self.get_summon(conn, summon_id) + if not current: + return None + if current["status"] in {"accepted", "declined"}: + return current + + conn.execute( + """UPDATE summons + SET status = 'pending', + delivered_at = NULL, + responded_at = NULL, + delivery_result = NULL, + expires_at = COALESCE(?, expires_at) + WHERE id = ? + AND status IN ('pending', 'delivered', 'failed', 'timeout')""", + (expires_at, summon_id), + ) + self.record_summon_event( + conn, + summon_id=summon_id, + dispatch_id=current.get("dispatch_id"), + agent_id=current.get("agent_id"), + event="summon.retry", + payload=payload, + ) + return self.get_summon(conn, summon_id) + + def respond_summon( + self, + conn: sqlite3.Connection, + discussion_id: str, + agent_id: str, + accept: bool, + *, + metadata: dict[str, Any] | None = None, + ) -> dict[str, Any]: + summon = self.get_summon(conn, discussion_id=discussion_id, agent_id=agent_id) + if not summon: + return {"error": "No summon found"} + if summon["status"] in {"accepted", "declined", "timeout"}: + return summon + + now = int(time.time()) + new_status = "accepted" if accept else "declined" + conn.execute( + """UPDATE summons + SET status = ?, responded_at = ?, + metadata = COALESCE(?, metadata) + WHERE id = ? AND status IN ('pending', 'delivered', 'failed')""", + (new_status, now, json.dumps(metadata) if metadata else None, summon["id"]), + ) + if accept: + self.add_participant( + conn, + discussion_id, + agent_id, + role=summon.get("role"), + perspective=summon.get("perspective"), + display_name=agent_id, + ) + self.record_summon_event( + conn, + summon_id=summon["id"], + dispatch_id=summon.get("dispatch_id"), + agent_id=agent_id, + event="summon.accepted" if accept else "summon.declined", + payload=metadata, + ) + updated = self.get_summon(conn, summon["id"]) + return updated or {"error": "Summon response could not be persisted"} + + def expire_summons(self, conn: sqlite3.Connection, now: int | None = None) -> int: + now = int(time.time()) if now is None else int(now) + rows = conn.execute( + """SELECT * FROM summons + WHERE expires_at IS NOT NULL + AND expires_at <= ? + AND status IN ('pending', 'delivered')""", + (now,), + ).fetchall() + for row in rows: + summon = self._row_to_summon(row) + conn.execute( + "UPDATE summons SET status = 'timeout', responded_at = ? WHERE id = ?", + (now, summon["id"]), + ) + self.record_summon_event( + conn, + summon_id=summon["id"], + dispatch_id=summon.get("dispatch_id"), + agent_id=summon.get("agent_id"), + event="summon.timeout", + payload={"expires_at": summon.get("expires_at"), "now": now}, + ) + return len(rows) + + def record_summon_event( + self, + conn: sqlite3.Connection, + summon_id: str | None, + dispatch_id: str | None, + agent_id: str | None, + event: str, + payload: dict[str, Any] | None = None, + ) -> int: + now = int(time.time()) + cur = conn.execute( + """INSERT INTO summon_events + (summon_id, dispatch_id, agent_id, event, payload, created_at) + VALUES (?, ?, ?, ?, ?, ?)""", + (summon_id, dispatch_id, agent_id, event, json.dumps(payload) if payload else None, now), + ) + return cur.lastrowid or 0 + + def list_summon_events( + self, + conn: sqlite3.Connection, + *, + summon_id: str | None = None, + dispatch_id: str | None = None, + agent_id: str | None = None, + ) -> list[dict[str, Any]]: + query = "SELECT * FROM summon_events WHERE 1=1" + params: list[Any] = [] + if summon_id: + query += " AND summon_id = ?" + params.append(summon_id) + if dispatch_id: + query += " AND dispatch_id = ?" + params.append(dispatch_id) + if agent_id: + query += " AND agent_id = ?" + params.append(agent_id) + query += " ORDER BY created_at ASC, id ASC" + rows = conn.execute(query, params).fetchall() + return [ + { + "id": row["id"], + "summon_id": row["summon_id"], + "dispatch_id": row["dispatch_id"], + "agent_id": row["agent_id"], + "event": row["event"], + "payload": self._loads_json(row["payload"], default={}), + "created_at": row["created_at"], + } + for row in rows + ] + + def dispatch_readiness(self, conn: sqlite3.Connection, dispatch_id: str) -> dict[str, Any]: + self.expire_summons(conn) + dispatch = self.get_dispatch(conn, dispatch_id) + if not dispatch: + raise ValueError(f"Dispatch {dispatch_id} not found") + summons = self.get_summons(conn, dispatch_id=dispatch_id) + total = len(summons) + counts = {status: 0 for status in VALID_SUMMON_STATUSES} + for summon in summons: + counts[summon["status"]] = counts.get(summon["status"], 0) + 1 + accepted = counts.get("accepted", 0) + min_accepts = int(dispatch.get("min_accepts") or 0) + policy = dispatch.get("start_policy") or "quorum" + now = int(time.time()) + timed_out = (now - int(dispatch["created_at"])) >= int(dispatch.get("timeout_seconds") or 0) + + ready = False + reason = "waiting" + terminal_timeout = False + if dispatch["status"] == "active": + ready = True + reason = "already_active" + elif policy == "immediate": + ready = True + reason = "immediate" + elif policy == "quorum": + ready = accepted >= min_accepts + reason = "quorum_met" if ready else "quorum_waiting" + terminal_timeout = timed_out and not ready + elif policy == "all": + ready = total > 0 and accepted == total + reason = "all_accepted" if ready else "all_waiting" + terminal_timeout = timed_out and not ready + elif policy == "timeout": + ready = timed_out and accepted >= min_accepts + reason = "timeout_ready" if ready else "timeout_waiting" + terminal_timeout = timed_out and not ready + + return { + "dispatch_id": dispatch_id, + "discussion_id": dispatch["discussion_id"], + "ready": ready, + "reason": reason, + "timed_out": timed_out, + "terminal_timeout": terminal_timeout, + "policy": policy, + "accepted": accepted, + "total": total, + "counts": counts, + "min_accepts": min_accepts, + "dispatch_status": dispatch["status"], + } + + def apply_dispatch_readiness(self, conn: sqlite3.Connection, dispatch_id: str) -> dict[str, Any]: + readiness = self.dispatch_readiness(conn, dispatch_id) + dispatch = self.get_dispatch(conn, dispatch_id) + if not dispatch: + raise ValueError(f"Dispatch {dispatch_id} not found") + now = int(time.time()) + + if readiness["ready"] and dispatch["status"] == "pending": + updated = self.update_dispatch_status(conn, dispatch_id, "active", started_at=now) + self.activate_discussion(conn, dispatch["discussion_id"]) + self.record_summon_event( + conn, + summon_id=None, + dispatch_id=dispatch_id, + agent_id=dispatch.get("coordinator_agent_id"), + event="dispatch.active", + payload=readiness, + ) + return {"dispatch": updated, "readiness": readiness, "discussion_activated": True} + + if readiness["terminal_timeout"] and dispatch["status"] == "pending": + updated = self.update_dispatch_status(conn, dispatch_id, "timeout", completed_at=now) + self.record_summon_event( + conn, + summon_id=None, + dispatch_id=dispatch_id, + agent_id=dispatch.get("coordinator_agent_id"), + event="dispatch.timeout", + payload=readiness, + ) + return {"dispatch": updated, "readiness": readiness, "discussion_activated": False} + + return {"dispatch": dispatch, "readiness": readiness, "discussion_activated": False} diff --git a/src/roundtable/mcp/bridges/codex.py b/src/roundtable/mcp/bridges/codex.py index 44f2094..d9bb639 100644 --- a/src/roundtable/mcp/bridges/codex.py +++ b/src/roundtable/mcp/bridges/codex.py @@ -4,6 +4,7 @@ import json import logging +import secrets import threading from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any @@ -33,11 +34,19 @@ def __init__( host: str = "127.0.0.1", display_name: str = "Codex Agent", db_path: str | None = None, + skills: list[str] | None = None, + availability: str = "idle", + accept_policy: str = "auto", + auth_token: str | None = None, ): self._agent_id = agent_id self._port = port self._host = host self._display_name = display_name + self._skills = skills or ["agent-roundtable"] + self._availability = availability + self._accept_policy = accept_policy + self._auth_token = auth_token self._db = RoundtableDB(db_path=db_path) self._core = RoundtableCore(db=self._db) self._server: HTTPServer | None = None @@ -70,11 +79,17 @@ def start(self) -> None: transport="http", endpoint=f"http://{self._host}:{self._port}", capabilities=["speak", "listen"], + metadata={ + "skills": self._skills, + "availability": self._availability, + "accept_policy": self._accept_policy, + **({"_bridge_auth_token": self._auth_token} if self._auth_token else {}), + }, ) finally: conn.close() - handler = _make_handler(self._core, self._db, self._agent_id) + handler = _make_handler(self) self._server = HTTPServer((self._host, self._port), handler) self._thread = threading.Thread(target=self._server.serve_forever, daemon=True) self._thread.start() @@ -92,14 +107,24 @@ async def generate_speech(self, context: dict[str, Any]) -> str: raise NotImplementedError("Codex generates speech via its own CLI process") -def _make_handler(core: RoundtableCore, db: RoundtableDB, agent_id: str) -> type[BaseHTTPRequestHandler]: +def _make_handler(bridge: CodexBridge) -> type[BaseHTTPRequestHandler]: """Create an HTTP request handler class with access to core/db.""" + core = bridge._core + db = bridge._db + agent_id = bridge.agent_id + auth_token = bridge._auth_token + class CodexBridgeHandler(BaseHTTPRequestHandler): def do_GET(self) -> None: path = urlparse(self.path).path if path == "/health": + conn = db.connect() + try: + db.touch_agent(conn, agent_id, availability=bridge._availability) + finally: + conn.close() self._respond(200, {"status": "ok", "agent_id": agent_id, "platform": "codex"}) elif path == "/agent": @@ -111,6 +136,8 @@ def do_GET(self) -> None: conn.close() elif path == "/inbox": + if not self._require_auth(): + return conn = db.connect() try: db.touch_agent(conn, agent_id) @@ -130,6 +157,8 @@ def do_GET(self) -> None: def do_POST(self) -> None: path = urlparse(self.path).path + if not self._require_auth(): + return content_length = int(self.headers.get("Content-Length", 0)) try: body = json.loads(self.rfile.read(content_length)) if content_length else {} @@ -150,6 +179,27 @@ def do_POST(self) -> None: ) self._respond(200, {"accepted": "error" not in result, "result": result}) + elif path == "/summon": + discussion_id = body.get("discussion_id") + if not discussion_id: + self._respond(400, {"error": "discussion_id required"}) + return + auto_accept = bridge._accept_policy == "auto" + if auto_accept: + result = handle_tool_call( + core, + db, + "roundtable_accept_summon", + { + "discussion_id": discussion_id, + "agent_id": agent_id, + "metadata": {"source": "codex_bridge", "summon_id": body.get("summon_id")}, + }, + ) + else: + result = {"status": "pending", "reason": "manual_accept_required"} + self._respond(200, {"accepted": auto_accept and "error" not in result, "result": result}) + elif path == "/tool": tool_name = body.get("name", "") arguments = body.get("arguments", {}) @@ -180,6 +230,19 @@ def do_POST(self) -> None: else: self._respond(404, {"error": "not found", "path": path}) + def _require_auth(self) -> bool: + if not auth_token: + return True + bearer = self.headers.get("Authorization", "") + token = "" + if bearer.startswith("Bearer "): + token = bearer[len("Bearer ") :].strip() + token = token or self.headers.get("X-Roundtable-Token", "") + if secrets.compare_digest(token, auth_token): + return True + self._respond(401, {"error": "unauthorized"}) + return False + def _respond(self, status: int, data: dict[str, Any]) -> None: payload = json.dumps(data, ensure_ascii=False, default=str).encode() self.send_response(status) diff --git a/src/roundtable/mcp/bridges/generic.py b/src/roundtable/mcp/bridges/generic.py index 82ef64b..d76d3e8 100644 --- a/src/roundtable/mcp/bridges/generic.py +++ b/src/roundtable/mcp/bridges/generic.py @@ -9,6 +9,7 @@ import json import logging +import secrets import threading import urllib.request from http.server import BaseHTTPRequestHandler, HTTPServer @@ -30,6 +31,7 @@ class GenericBridge(AgentBridge): GET /health — liveness check GET /inbox — read unread messages (marks them read) POST /tool — dispatch any roundtable tool + POST /summon — accept a dispatch summon and join POST /speak — shorthand for roundtable_speak GET /status/{disc_id} — discussion status GET /agent — this bridge's agent metadata @@ -48,6 +50,11 @@ def __init__( host: str = "127.0.0.1", display_name: str | None = None, capabilities: list[str] | None = None, + skills: list[str] | None = None, + availability: str = "idle", + accept_policy: str = "auto", + metadata: dict[str, Any] | None = None, + auth_token: str | None = None, webhook_url: str | None = None, db_path: str | None = None, ): @@ -57,6 +64,11 @@ def __init__( self._host = host self._display_name = display_name or agent_id self._capabilities = capabilities or ["speak", "listen"] + self._skills = skills or ["agent-roundtable"] + self._availability = availability + self._accept_policy = accept_policy + self._metadata = metadata or {} + self._auth_token = auth_token self._webhook_url = webhook_url self._db = RoundtableDB(db_path=db_path) self._core = RoundtableCore(db=self._db, on_event=self._on_core_event) @@ -86,6 +98,13 @@ def start(self) -> None: transport="http", endpoint=f"http://{self._host}:{self._port}", capabilities=self._capabilities, + metadata={ + **self._metadata, + "skills": self._skills, + "availability": self._availability, + "accept_policy": self._accept_policy, + **({"_bridge_auth_token": self._auth_token} if self._auth_token else {}), + }, ) finally: conn.close() @@ -132,19 +151,25 @@ def _post_webhook(self, event_type: str, payload: dict[str, Any]) -> None: ) urllib.request.urlopen(req, timeout=2) except Exception as e: - logger.debug("Webhook delivery failed: %s", e) + logger.warning("Webhook delivery failed: %s", e) def _make_handler(bridge: GenericBridge) -> type[BaseHTTPRequestHandler]: db = bridge._db core = bridge._core agent_id = bridge.agent_id + auth_token = bridge._auth_token class Handler(BaseHTTPRequestHandler): def do_GET(self) -> None: path = urlparse(self.path).path if path == "/health": + conn = db.connect() + try: + db.touch_agent(conn, agent_id, availability=bridge._availability) + finally: + conn.close() self._respond( 200, { @@ -163,6 +188,8 @@ def do_GET(self) -> None: conn.close() elif path == "/inbox": + if not self._require_auth(): + return conn = db.connect() try: db.touch_agent(conn, agent_id) @@ -182,6 +209,8 @@ def do_GET(self) -> None: def do_POST(self) -> None: path = urlparse(self.path).path + if not self._require_auth(): + return length = int(self.headers.get("Content-Length", 0)) try: body = json.loads(self.rfile.read(length)) if length else {} @@ -212,6 +241,28 @@ def do_POST(self) -> None: bridge._post_webhook("invitation", {"request": body, "result": result}) self._respond(200, {"accepted": "error" not in result, "result": result}) + elif path == "/summon": + discussion_id = body.get("discussion_id") + if not discussion_id: + self._respond(400, {"error": "discussion_id required"}) + return + auto_accept = bridge._accept_policy == "auto" + if auto_accept: + result = handle_tool_call( + core, + db, + "roundtable_accept_summon", + { + "discussion_id": discussion_id, + "agent_id": agent_id, + "metadata": {"source": "generic_bridge", "summon_id": body.get("summon_id")}, + }, + ) + else: + result = {"status": "pending", "reason": "manual_accept_required"} + bridge._post_webhook("summon", {"request": body, "result": result}) + self._respond(200, {"accepted": auto_accept and "error" not in result, "result": result}) + elif path == "/turn": bridge._post_webhook("turn", body) self._respond(200, {"received": True, "payload": body}) @@ -234,6 +285,19 @@ def do_POST(self) -> None: else: self._respond(404, {"error": "not found", "path": path}) + def _require_auth(self) -> bool: + if not auth_token: + return True + bearer = self.headers.get("Authorization", "") + token = "" + if bearer.startswith("Bearer "): + token = bearer[len("Bearer ") :].strip() + token = token or self.headers.get("X-Roundtable-Token", "") + if secrets.compare_digest(token, auth_token): + return True + self._respond(401, {"error": "unauthorized"}) + return False + def _respond(self, status: int, data: dict[str, Any]) -> None: payload = json.dumps(data, ensure_ascii=False, default=str).encode() self.send_response(status) diff --git a/src/roundtable/mcp/tools.py b/src/roundtable/mcp/tools.py index 6ba14b7..fbb82dd 100644 --- a/src/roundtable/mcp/tools.py +++ b/src/roundtable/mcp/tools.py @@ -34,16 +34,44 @@ }, "transport": {"type": "string", "enum": ["stdio", "http"], "default": "stdio"}, "endpoint": {"type": "string", "description": "Webhook URL for http transport agents"}, + "metadata": {"type": "object", "description": "Agent registry metadata"}, + "skills": { + "type": "array", + "items": {"type": "string"}, + "description": "Installed skills, e.g. agent-roundtable", + }, + "skill_versions": {"type": "object", "description": "Skill version map"}, + "roles": {"type": "array", "items": {"type": "string"}, "description": "Preferred roles"}, + "availability": {"type": "string", "description": "idle|busy|offline or platform-specific state"}, + "accept_policy": {"type": "string", "description": "auto|manual|never"}, }, "required": ["agent_id", "platform"], }, }, { "name": "roundtable_list_agents", - "description": "List registered agents. Use online_only=true to see only active agents.", + "description": "List registered agents. Use filters to discover active agents with a required skill.", + "inputSchema": { + "type": "object", + "properties": { + "online_only": {"type": "boolean", "default": False}, + "timeout_seconds": {"type": "integer", "default": 90}, + "required_skill": {"type": "string"}, + "availability": {"type": "string"}, + }, + }, + }, + { + "name": "roundtable_heartbeat", + "description": "Refresh this agent's runtime presence and availability.", "inputSchema": { "type": "object", - "properties": {"online_only": {"type": "boolean", "default": False}}, + "properties": { + "agent_id": {"type": "string"}, + "availability": {"type": "string"}, + "metadata": {"type": "object"}, + }, + "required": ["agent_id"], }, }, { @@ -64,8 +92,107 @@ "web": {"type": "boolean", "default": False, "description": "Start the Web Viewer for this discussion"}, "invite_agents": {"type": "array", "items": {"type": "string"}, "description": "Agent IDs to invite"}, "created_by": {"type": "string", "description": "Creator agent ID"}, + "status": {"type": "string", "enum": ["assembling", "active"], "default": "active"}, + }, + "required": ["topic"], + }, + }, + { + "name": "roundtable_summon_agents", + "description": "Summon registered agents into a dispatch, optionally creating an assembling discussion first.", + "inputSchema": { + "type": "object", + "properties": { + "discussion_id": {"type": "string", "description": "Existing discussion to summon into"}, + "topic": {"type": "string", "description": "Topic when creating a new assembling discussion"}, + "context": {"type": "string"}, + "participants": {"type": "array", "items": {"type": "object"}, "default": []}, + "max_rounds": {"type": "integer", "default": 3}, + "speech_order": {"type": "string", "default": "fixed"}, + "web": {"type": "boolean", "default": False}, + "coordinator_agent_id": {"type": "string"}, + "agent_ids": {"type": "array", "items": {"type": "string"}, "description": "Explicit agents to summon"}, + "required_skill": {"type": "string", "description": "Only summon agents advertising this skill"}, + "availability": {"type": "string", "description": "Only summon agents with this availability"}, + "online_only": {"type": "boolean", "default": True}, + "timeout_seconds": {"type": "integer", "default": 90}, + "dispatch_timeout_seconds": {"type": "integer", "default": 60}, + "mode": {"type": "string", "enum": ["managed", "federated"], "default": "federated"}, + "start_policy": { + "type": "string", + "enum": ["immediate", "quorum", "all", "timeout"], + "default": "quorum", + }, + "min_accepts": {"type": "integer", "default": 1}, + "role": {"type": "string"}, + "perspective": {"type": "string"}, + "metadata": {"type": "object"}, + "idempotency_key": {"type": "string"}, + "allow_terminal_retry": { + "type": "boolean", + "default": False, + "description": "Release a terminal idempotency_key and create a retry dispatch.", + }, + }, + "required": ["coordinator_agent_id"], + }, + }, + { + "name": "roundtable_dispatch_status", + "description": "Inspect a dispatch and apply readiness/timeout transitions.", + "inputSchema": { + "type": "object", + "properties": { + "dispatch_id": {"type": "string"}, + "discussion_id": {"type": "string"}, + }, + }, + }, + { + "name": "roundtable_retry_summon", + "description": "Retry pending, failed, delivered, or timed-out summons without creating duplicate summon rows.", + "inputSchema": { + "type": "object", + "properties": { + "dispatch_id": {"type": "string"}, + "summon_id": {"type": "string"}, + "discussion_id": {"type": "string"}, + "agent_ids": {"type": "array", "items": {"type": "string"}}, + "statuses": { + "type": "array", + "items": {"type": "string"}, + "default": ["pending", "delivered", "failed", "timeout"], + }, + "retry_timeout_seconds": {"type": "integer", "default": 60}, + "requeue_inbox": {"type": "boolean", "default": True}, + "redeliver_http": {"type": "boolean", "default": True}, + }, + }, + }, + { + "name": "roundtable_accept_summon", + "description": "Accept a summon and join its discussion.", + "inputSchema": { + "type": "object", + "properties": { + "discussion_id": {"type": "string"}, + "agent_id": {"type": "string"}, + "metadata": {"type": "object"}, + }, + "required": ["discussion_id", "agent_id"], + }, + }, + { + "name": "roundtable_decline_summon", + "description": "Decline a summon.", + "inputSchema": { + "type": "object", + "properties": { + "discussion_id": {"type": "string"}, + "agent_id": {"type": "string"}, + "metadata": {"type": "object"}, }, - "required": ["topic", "participants"], + "required": ["discussion_id", "agent_id"], }, }, { @@ -218,7 +345,7 @@ "inputSchema": { "type": "object", "properties": { - "status": {"type": "string", "enum": ["active", "concluded", "cancelled"]}, + "status": {"type": "string", "enum": ["assembling", "active", "concluded", "cancelled"]}, "limit": {"type": "integer", "default": 20}, }, }, @@ -238,6 +365,7 @@ def handle_tool_call(core: RoundtableCore, db: RoundtableDB, name: str, argument conn = db.connect() try: if name == "roundtable_register_agent": + metadata = _agent_metadata_from_arguments(arguments) return db.upsert_agent( conn, agent_id=arguments["agent_id"], @@ -247,19 +375,37 @@ def handle_tool_call(core: RoundtableCore, db: RoundtableDB, name: str, argument capabilities=arguments.get("capabilities"), transport=arguments.get("transport", "stdio"), endpoint=arguments.get("endpoint"), + metadata=metadata, ) elif name == "roundtable_list_agents": - return {"agents": db.list_agents(conn, online_only=arguments.get("online_only", False))} + return { + "agents": db.list_agents( + conn, + online_only=arguments.get("online_only", False), + timeout_seconds=arguments.get("timeout_seconds", 90), + required_skill=arguments.get("required_skill"), + availability=arguments.get("availability"), + ) + } + + elif name == "roundtable_heartbeat": + return db.heartbeat_agent( + conn, + arguments["agent_id"], + availability=arguments.get("availability"), + metadata=arguments.get("metadata"), + ) elif name == "roundtable_create": result = core.create_discussion( topic=arguments["topic"], - participants=arguments["participants"], + participants=arguments.get("participants", []), context=arguments.get("context"), max_rounds=arguments.get("max_rounds", 3), speech_order=arguments.get("speech_order", "fixed"), created_by=arguments.get("created_by", "coordinator"), + status=arguments.get("status", "active"), web=arguments.get("web", False), ) invite_agents = arguments.get("invite_agents", []) @@ -279,6 +425,43 @@ def handle_tool_call(core: RoundtableCore, db: RoundtableDB, name: str, argument result["invites"] = invite_results return result + elif name == "roundtable_summon_agents": + return _summon_agents(core, db, conn, arguments) + + elif name == "roundtable_dispatch_status": + return _dispatch_status(db, conn, arguments) + + elif name == "roundtable_retry_summon": + return _retry_summon(core, db, conn, arguments) + + elif name == "roundtable_accept_summon": + result = db.respond_summon( + conn, + arguments["discussion_id"], + arguments["agent_id"], + accept=True, + metadata=arguments.get("metadata"), + ) + dispatch_id = result.get("dispatch_id") if isinstance(result, dict) else None + if dispatch_id: + result["dispatch"] = db.apply_dispatch_readiness(conn, dispatch_id) + core._sync_web_discussion_state(arguments["discussion_id"], conn) + return result + + elif name == "roundtable_decline_summon": + result = db.respond_summon( + conn, + arguments["discussion_id"], + arguments["agent_id"], + accept=False, + metadata=arguments.get("metadata"), + ) + dispatch_id = result.get("dispatch_id") if isinstance(result, dict) else None + if dispatch_id: + result["dispatch"] = db.apply_dispatch_readiness(conn, dispatch_id) + core._sync_web_discussion_state(arguments["discussion_id"], conn) + return result + elif name == "roundtable_invite": return _invite_agent( db, @@ -374,6 +557,274 @@ def handle_tool_call(core: RoundtableCore, db: RoundtableDB, name: str, argument conn.close() +def _agent_metadata_from_arguments(arguments: dict[str, Any]) -> dict[str, Any] | None: + metadata = dict(arguments.get("metadata") or {}) + for key in ("skills", "skill_versions", "roles", "availability", "accept_policy"): + value = arguments.get(key) + if value is not None: + metadata[key] = value + return metadata or None + + +def _summon_agents(core: RoundtableCore, db: RoundtableDB, conn: Any, arguments: dict[str, Any]) -> dict[str, Any]: + discussion_id = arguments.get("discussion_id") + created = None + if not discussion_id: + if not arguments.get("topic"): + return {"error": "topic is required when discussion_id is not provided"} + created = core.create_discussion( + topic=arguments["topic"], + participants=arguments.get("participants", []), + context=arguments.get("context"), + max_rounds=arguments.get("max_rounds", 3), + speech_order=arguments.get("speech_order", "fixed"), + created_by=arguments["coordinator_agent_id"], + status="assembling", + web=arguments.get("web", False), + ) + discussion_id = created["discussion_id"] + + agents = _select_summon_agents(db, conn, arguments) + if not agents: + return { + "ok": False, + "error": "No matching agents found", + "discussion_id": discussion_id, + "created": created, + } + + dispatch = db.create_dispatch( + conn, + discussion_id, + arguments["coordinator_agent_id"], + mode=arguments.get("mode", "federated"), + start_policy=arguments.get("start_policy", "quorum"), + min_accepts=arguments.get("min_accepts", 1), + timeout_seconds=arguments.get("dispatch_timeout_seconds", 60), + idempotency_key=arguments.get("idempotency_key"), + allow_terminal_retry=arguments.get("allow_terminal_retry", False), + metadata=arguments.get("metadata"), + ) + + summons = [] + deliveries = [] + expires_at = int(time.time()) + int(arguments.get("dispatch_timeout_seconds", 60)) + for agent in agents: + summon = db.create_summon( + conn, + discussion_id, + agent["agent_id"], + arguments["coordinator_agent_id"], + dispatch_id=dispatch["id"], + role=arguments.get("role"), + perspective=arguments.get("perspective"), + required_skill=arguments.get("required_skill"), + expires_at=expires_at, + allow_terminal_retry=arguments.get("allow_terminal_retry", False), + metadata=arguments.get("metadata"), + ) + summons.append(summon) + inbox_id = db.push_inbox( + conn, + agent["agent_id"], + "summon", + payload={ + "summon_id": summon["id"], + "dispatch_id": dispatch["id"], + "discussion_id": discussion_id, + "invited_by": arguments["coordinator_agent_id"], + "role": summon.get("role"), + "perspective": summon.get("perspective"), + "required_skill": summon.get("required_skill"), + "expires_at": summon.get("expires_at"), + }, + discussion_id=discussion_id, + ) + delivery = _deliver_http_summon(db, conn, summon["id"]) + if delivery: + deliveries.append(delivery) + else: + deliveries.append({"agent_id": agent["agent_id"], "inbox_message_id": inbox_id}) + + readiness = db.apply_dispatch_readiness(conn, dispatch["id"]) + core._sync_web_discussion_state(discussion_id, conn) + return { + "ok": True, + "discussion_id": discussion_id, + "created": created, + "dispatch": readiness.get("dispatch") or dispatch, + "readiness": readiness.get("readiness"), + "summons": db.get_summons(conn, dispatch_id=dispatch["id"]), + "deliveries": deliveries, + } + + +def _select_summon_agents(db: RoundtableDB, conn: Any, arguments: dict[str, Any]) -> list[dict[str, Any]]: + explicit_ids = arguments.get("agent_ids") or [] + if explicit_ids: + agents = [] + for agent_id in explicit_ids: + agent = db.get_agent(conn, agent_id) + if agent: + agents.append(agent) + else: + agents = db.list_agents( + conn, + online_only=arguments.get("online_only", True), + timeout_seconds=arguments.get("timeout_seconds", 90), + required_skill=arguments.get("required_skill"), + availability=arguments.get("availability"), + ) + + selected = [] + required_skill = arguments.get("required_skill") + availability = arguments.get("availability") + coordinator = arguments.get("coordinator_agent_id") + for agent in agents: + if agent["agent_id"] == coordinator: + continue + if required_skill and required_skill not in agent.get("skills", []): + continue + if availability and agent.get("availability") != availability: + continue + selected.append(agent) + return selected + + +def _dispatch_status(db: RoundtableDB, conn: Any, arguments: dict[str, Any]) -> dict[str, Any]: + dispatch_ids = [] + if arguments.get("dispatch_id"): + dispatch_ids.append(arguments["dispatch_id"]) + elif arguments.get("discussion_id"): + dispatch_ids.extend(d["id"] for d in db.list_dispatches(conn, discussion_id=arguments["discussion_id"])) + else: + return {"error": "dispatch_id or discussion_id is required"} + + results = [] + for dispatch_id in dispatch_ids: + status = db.apply_dispatch_readiness(conn, dispatch_id) + dispatch = status.get("dispatch") + results.append( + { + **status, + "summons": db.get_summons(conn, dispatch_id=dispatch_id), + "events": db.list_summon_events(conn, dispatch_id=dispatch_id), + "discussion_id": dispatch.get("discussion_id") if dispatch else None, + } + ) + return {"ok": True, "dispatches": results, "count": len(results)} + + +def _retry_summon(core: RoundtableCore, db: RoundtableDB, conn: Any, arguments: dict[str, Any]) -> dict[str, Any]: + summons = _select_retry_summons(db, conn, arguments) + if isinstance(summons, dict): + return summons + + retryable_statuses = {"pending", "delivered", "failed", "timeout"} + requested_statuses = set(arguments.get("statuses") or ["pending", "delivered", "failed", "timeout"]) + retry_statuses = requested_statuses & retryable_statuses + agent_ids = set(arguments.get("agent_ids") or []) + retry_timeout_seconds = int(arguments.get("retry_timeout_seconds", 60)) + expires_at = int(time.time()) + max(0, retry_timeout_seconds) if retry_timeout_seconds >= 0 else None + requeue_inbox = arguments.get("requeue_inbox", True) + redeliver_http = arguments.get("redeliver_http", True) + + retried = [] + skipped = [] + deliveries = [] + dispatch_ids: set[str] = set() + discussion_ids: set[str] = set() + for summon in summons: + if agent_ids and summon["agent_id"] not in agent_ids: + skipped.append({"summon_id": summon["id"], "agent_id": summon["agent_id"], "reason": "agent_filtered"}) + continue + if summon["status"] not in retry_statuses: + skipped.append( + { + "summon_id": summon["id"], + "agent_id": summon["agent_id"], + "status": summon["status"], + "reason": "status_not_retryable", + } + ) + continue + + if summon.get("dispatch_id"): + db.reopen_dispatch_for_retry( + conn, + summon["dispatch_id"], + retry_timeout_seconds=retry_timeout_seconds, + ) + dispatch_ids.add(summon["dispatch_id"]) + discussion_ids.add(summon["discussion_id"]) + reset = db.reset_summon_for_retry( + conn, + summon["id"], + expires_at=expires_at, + payload={"previous_status": summon["status"], "retry_timeout_seconds": retry_timeout_seconds}, + ) + if not reset: + skipped.append({"summon_id": summon["id"], "agent_id": summon["agent_id"], "reason": "not_found"}) + continue + + delivery: dict[str, Any] = {"agent_id": summon["agent_id"], "summon_id": summon["id"]} + if requeue_inbox: + inbox_id = db.push_inbox( + conn, + summon["agent_id"], + "summon", + payload={ + "summon_id": summon["id"], + "dispatch_id": summon.get("dispatch_id"), + "discussion_id": summon["discussion_id"], + "invited_by": summon["invited_by"], + "role": summon.get("role"), + "perspective": summon.get("perspective"), + "required_skill": summon.get("required_skill"), + "expires_at": expires_at, + "retry": True, + }, + discussion_id=summon["discussion_id"], + ) + delivery["inbox_message_id"] = inbox_id + if redeliver_http: + http_delivery = _deliver_http_summon(db, conn, summon["id"]) + if http_delivery: + delivery["http"] = http_delivery + deliveries.append(delivery) + retried.append(db.get_summon(conn, summon["id"]) or reset) + + dispatch_results = [] + for dispatch_id in sorted(dispatch_ids): + dispatch_results.append(db.apply_dispatch_readiness(conn, dispatch_id)) + for discussion_id in sorted(discussion_ids): + core._sync_web_discussion_state(discussion_id, conn) + + return { + "ok": True, + "retried": retried, + "skipped": skipped, + "deliveries": deliveries, + "dispatches": dispatch_results, + "count": len(retried), + } + + +def _select_retry_summons( + db: RoundtableDB, + conn: Any, + arguments: dict[str, Any], +) -> list[dict[str, Any]] | dict[str, Any]: + if arguments.get("summon_id"): + summon = db.get_summon(conn, arguments["summon_id"]) + return [summon] if summon else {"error": "summon not found"} + if arguments.get("dispatch_id"): + return db.get_summons(conn, dispatch_id=arguments["dispatch_id"]) + if arguments.get("discussion_id"): + return db.get_summons(conn, discussion_id=arguments["discussion_id"]) + return {"error": "summon_id, dispatch_id, or discussion_id is required"} + + def _invite_agent( db: RoundtableDB, conn: Any, @@ -412,12 +863,13 @@ def _invite_agent( def _add_participant_from_invite( db: RoundtableDB, conn: Any, discussion_id: str, agent_id: str, invitation: dict[str, Any] ) -> None: - now = int(time.time()) - conn.execute( - """INSERT OR IGNORE INTO participants - (discussion_id, participant, role, perspective, display_name, joined_at, is_active) - VALUES (?, ?, ?, ?, ?, ?, 1)""", - (discussion_id, agent_id, invitation.get("role"), invitation.get("perspective"), agent_id, now), + db.add_participant( + conn, + discussion_id, + agent_id, + role=invitation.get("role"), + perspective=invitation.get("perspective"), + display_name=agent_id, ) @@ -430,7 +882,7 @@ def _deliver_http_invitation( role: str | None, perspective: str | None, ) -> dict[str, Any] | None: - agent = db.get_agent(conn, agent_id) + agent = db.get_agent(conn, agent_id, include_private=True) if not agent or agent.get("transport") != "http" or not agent.get("endpoint"): return None @@ -447,7 +899,7 @@ def _deliver_http_invitation( request = urllib.request.Request( url, data=json.dumps(payload).encode(), - headers={"Content-Type": "application/json"}, + headers=_http_headers_for_agent(agent), method="POST", ) with urllib.request.urlopen(request, timeout=2) as response: @@ -457,6 +909,49 @@ def _deliver_http_invitation( return {"transport": "http", "endpoint": url, "ok": False, "error": str(exc)} +def _deliver_http_summon(db: RoundtableDB, conn: Any, summon_id: str) -> dict[str, Any] | None: + summon = db.get_summon(conn, summon_id) + if not summon: + return None + agent = db.get_agent(conn, summon["agent_id"], include_private=True) + if not agent or agent.get("transport") != "http" or not agent.get("endpoint"): + return None + + payload = { + "type": "summon", + "summon_id": summon["id"], + "dispatch_id": summon["dispatch_id"], + "discussion_id": summon["discussion_id"], + "agent_id": summon["agent_id"], + "invited_by": summon["invited_by"], + "role": summon["role"], + "perspective": summon["perspective"], + "required_skill": summon["required_skill"], + "expires_at": summon["expires_at"], + } + url = f"{str(agent['endpoint']).rstrip('/')}/summon" + try: + request = urllib.request.Request( + url, + data=json.dumps(payload).encode(), + headers=_http_headers_for_agent(agent), + method="POST", + ) + with urllib.request.urlopen(request, timeout=2) as response: + body = response.read().decode("utf-8") + delivery = {"agent_id": summon["agent_id"], "transport": "http", "endpoint": url, "ok": True, "response": body} + except (OSError, urllib.error.URLError, TimeoutError) as exc: + delivery = { + "agent_id": summon["agent_id"], + "transport": "http", + "endpoint": url, + "ok": False, + "error": str(exc), + } + db.mark_summon_delivered(conn, summon_id, delivery, transport="http", endpoint=url) + return delivery + + def _notify_next_speaker( db: RoundtableDB, conn: Any, @@ -464,7 +959,7 @@ def _notify_next_speaker( agent_id: str, round_num: Any, ) -> dict[str, Any]: - agent = db.get_agent(conn, agent_id) + agent = db.get_agent(conn, agent_id, include_private=True) if not agent: return {"skipped": True, "reason": "agent_not_registered"} @@ -484,7 +979,7 @@ def _notify_next_speaker( request = urllib.request.Request( url, data=json.dumps({"type": "turn", **payload}).encode(), - headers={"Content-Type": "application/json"}, + headers=_http_headers_for_agent(agent), method="POST", ) with urllib.request.urlopen(request, timeout=2) as response: @@ -495,6 +990,20 @@ def _notify_next_speaker( return result +def _http_headers_for_agent(agent: dict[str, Any]) -> dict[str, str]: + headers = {"Content-Type": "application/json"} + metadata = agent.get("metadata") or {} + token = ( + metadata.get("_bridge_auth_token") + or metadata.get("bridge_auth_token") + or metadata.get("auth_token") + or metadata.get("roundtable_auth_token") + ) + if token: + headers["Authorization"] = f"Bearer {token}" + return headers + + def _check_turn( core: RoundtableCore, discussion_id: str, diff --git a/src/roundtable/orchestrator.py b/src/roundtable/orchestrator.py new file mode 100644 index 0000000..fb0594a --- /dev/null +++ b/src/roundtable/orchestrator.py @@ -0,0 +1,101 @@ +"""Dispatch orchestration modes for Roundtable. + +Managed mode is for agents controlled inside one host/platform. Federated mode +uses the runtime registry, heartbeats, inbox, and HTTP bridge delivery. +""" + +from __future__ import annotations + +from typing import Any + +from roundtable.core import RoundtableCore +from roundtable.db import RoundtableDB +from roundtable.mcp.tools import handle_tool_call + + +class ManagedOrchestrator: + """Start same-platform meetings without runtime registry delivery.""" + + def __init__(self, db: RoundtableDB | None = None): + self.db = db or RoundtableDB() + self.core = RoundtableCore(db=self.db) + + def start_discussion( + self, + *, + topic: str, + participants: list[dict[str, Any]], + coordinator_agent_id: str = "coordinator", + context: str | None = None, + max_rounds: int = 3, + speech_order: str = "fixed", + web: bool = False, + metadata: dict[str, Any] | None = None, + ) -> dict[str, Any]: + discussion = self.core.create_discussion( + topic=topic, + participants=participants, + context=context, + max_rounds=max_rounds, + speech_order=speech_order, + created_by=coordinator_agent_id, + status="active", + web=web, + ) + conn = self.db.connect() + try: + dispatch = self.db.create_dispatch( + conn, + discussion["discussion_id"], + coordinator_agent_id, + mode="managed", + start_policy="immediate", + min_accepts=0, + timeout_seconds=0, + metadata=metadata, + ) + readiness = self.db.apply_dispatch_readiness(conn, dispatch["id"]) + finally: + conn.close() + return {"ok": True, "mode": "managed", "discussion": discussion, **readiness} + + +class FederatedOrchestrator: + """Summon registered agents through registry/inbox/HTTP delivery.""" + + def __init__(self, db: RoundtableDB | None = None): + self.db = db or RoundtableDB() + self.core = RoundtableCore(db=self.db) + + def summon( + self, + *, + coordinator_agent_id: str, + topic: str | None = None, + discussion_id: str | None = None, + agent_ids: list[str] | None = None, + required_skill: str | None = "agent-roundtable", + availability: str | None = None, + min_accepts: int = 1, + start_policy: str = "quorum", + dispatch_timeout_seconds: int = 60, + metadata: dict[str, Any] | None = None, + **kwargs: Any, + ) -> dict[str, Any]: + arguments: dict[str, Any] = { + "coordinator_agent_id": coordinator_agent_id, + "required_skill": required_skill, + "availability": availability, + "min_accepts": min_accepts, + "start_policy": start_policy, + "dispatch_timeout_seconds": dispatch_timeout_seconds, + "metadata": metadata, + **kwargs, + } + if topic is not None: + arguments["topic"] = topic + if discussion_id is not None: + arguments["discussion_id"] = discussion_id + if agent_ids is not None: + arguments["agent_ids"] = agent_ids + return handle_tool_call(self.core, self.db, "roundtable_summon_agents", arguments) diff --git a/src/roundtable/schema.py b/src/roundtable/schema.py index bf1a184..866f33b 100644 --- a/src/roundtable/schema.py +++ b/src/roundtable/schema.py @@ -3,9 +3,12 @@ import sqlite3 from collections.abc import Callable -VALID_DISCUSSION_STATUSES = {"active", "concluded", "cancelled"} +VALID_DISCUSSION_STATUSES = {"assembling", "active", "concluded", "cancelled"} VALID_SPEECH_ORDERS = {"fixed", "random", "priority", "free"} VALID_FINDING_TYPES = {"consensus", "disagreement", "new_point"} +VALID_DISPATCH_MODES = {"managed", "federated"} +VALID_DISPATCH_STATUSES = {"pending", "active", "completed", "cancelled", "timeout"} +VALID_SUMMON_STATUSES = {"pending", "delivered", "accepted", "declined", "timeout", "failed"} INITIATION_ROUND = 0 @@ -18,7 +21,7 @@ topic TEXT NOT NULL, context TEXT, status TEXT DEFAULT 'active' - CHECK(status IN ('active', 'concluded', 'cancelled')), + CHECK(status IN ('assembling', 'active', 'concluded', 'cancelled')), max_rounds INTEGER DEFAULT 5, current_round INTEGER DEFAULT 0, speech_order TEXT DEFAULT 'fixed' @@ -84,6 +87,117 @@ ON speeches(discussion_id, participant); CREATE INDEX IF NOT EXISTS idx_findings_discussion ON findings(discussion_id, type); + +CREATE TABLE IF NOT EXISTS agents ( + agent_id TEXT PRIMARY KEY, + platform TEXT NOT NULL, + display_name TEXT, + persona TEXT, + capabilities TEXT, + transport TEXT DEFAULT 'stdio', + endpoint TEXT, + last_seen INTEGER NOT NULL, + metadata TEXT +); + +CREATE TABLE IF NOT EXISTS agent_inbox ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id TEXT NOT NULL, + type TEXT NOT NULL, + discussion_id TEXT, + payload TEXT NOT NULL, + created_at INTEGER NOT NULL, + read_at INTEGER, + FOREIGN KEY (agent_id) REFERENCES agents(agent_id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS idx_inbox_unread + ON agent_inbox(agent_id, read_at); +CREATE INDEX IF NOT EXISTS idx_inbox_discussion + ON agent_inbox(discussion_id, read_at); + +CREATE TABLE IF NOT EXISTS invitations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + discussion_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + role TEXT, + perspective TEXT, + status TEXT DEFAULT 'pending', + invited_by TEXT NOT NULL, + invited_at INTEGER NOT NULL, + responded_at INTEGER, + UNIQUE(discussion_id, agent_id) +); +CREATE INDEX IF NOT EXISTS idx_invitations_agent + ON invitations(agent_id, status); + +CREATE TABLE IF NOT EXISTS dispatches ( + id TEXT PRIMARY KEY, + discussion_id TEXT NOT NULL, + mode TEXT NOT NULL DEFAULT 'federated' + CHECK(mode IN ('managed', 'federated')), + coordinator_agent_id TEXT NOT NULL, + start_policy TEXT DEFAULT 'quorum', + min_accepts INTEGER DEFAULT 1, + timeout_seconds INTEGER DEFAULT 60, + status TEXT DEFAULT 'pending' + CHECK(status IN ('pending', 'active', 'completed', 'cancelled', 'timeout')), + idempotency_key TEXT UNIQUE, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + started_at INTEGER, + completed_at INTEGER, + metadata TEXT, + FOREIGN KEY (discussion_id) REFERENCES discussions(id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS idx_dispatches_discussion + ON dispatches(discussion_id, status); +CREATE INDEX IF NOT EXISTS idx_dispatches_coordinator + ON dispatches(coordinator_agent_id); + +CREATE TABLE IF NOT EXISTS summons ( + id TEXT PRIMARY KEY, + dispatch_id TEXT, + discussion_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + role TEXT, + perspective TEXT, + required_skill TEXT, + status TEXT DEFAULT 'pending' + CHECK(status IN ('pending', 'delivered', 'accepted', 'declined', 'timeout', 'failed')), + invited_by TEXT NOT NULL, + transport TEXT, + endpoint TEXT, + delivery_result TEXT, + idempotency_key TEXT UNIQUE, + created_at INTEGER NOT NULL, + delivered_at INTEGER, + responded_at INTEGER, + expires_at INTEGER, + metadata TEXT, + FOREIGN KEY (dispatch_id) REFERENCES dispatches(id) ON DELETE CASCADE, + FOREIGN KEY (discussion_id) REFERENCES discussions(id) ON DELETE CASCADE, + UNIQUE(discussion_id, agent_id) +); +CREATE INDEX IF NOT EXISTS idx_summons_agent + ON summons(agent_id, status); +CREATE INDEX IF NOT EXISTS idx_summons_dispatch + ON summons(dispatch_id, status); +CREATE INDEX IF NOT EXISTS idx_summons_timeout + ON summons(status, expires_at); + +CREATE TABLE IF NOT EXISTS summon_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + summon_id TEXT, + dispatch_id TEXT, + agent_id TEXT, + event TEXT NOT NULL, + payload TEXT, + created_at INTEGER NOT NULL, + FOREIGN KEY (summon_id) REFERENCES summons(id) ON DELETE CASCADE, + FOREIGN KEY (dispatch_id) REFERENCES dispatches(id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS idx_summon_events_summon + ON summon_events(summon_id, created_at); """ @@ -127,6 +241,8 @@ def _migrate_v1_to_v2(conn: sqlite3.Connection) -> None: ); CREATE INDEX IF NOT EXISTS idx_inbox_unread ON agent_inbox(agent_id, read_at); + CREATE INDEX IF NOT EXISTS idx_inbox_discussion + ON agent_inbox(discussion_id, read_at); CREATE TABLE IF NOT EXISTS invitations ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -145,9 +261,136 @@ def _migrate_v1_to_v2(conn: sqlite3.Connection) -> None: """) +def _migrate_v2_to_v3(conn: sqlite3.Connection) -> None: + """Add dispatch/summon tables and allow discussions to assemble before start.""" + table_row = conn.execute("SELECT sql FROM sqlite_master WHERE type = 'table' AND name = 'discussions'").fetchone() + needs_rebuild = table_row and "assembling" not in (table_row[0] or "") + if needs_rebuild: + fk = conn.execute("PRAGMA foreign_keys").fetchone()[0] + conn.execute("PRAGMA foreign_keys=OFF") + try: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS discussions_new ( + id TEXT PRIMARY KEY, + topic TEXT NOT NULL, + context TEXT, + status TEXT DEFAULT 'active' + CHECK(status IN ('assembling', 'active', 'concluded', 'cancelled')), + max_rounds INTEGER DEFAULT 5, + current_round INTEGER DEFAULT 0, + speech_order TEXT DEFAULT 'fixed' + CHECK(speech_order IN ('fixed', 'random', 'priority', 'free')), + created_by TEXT NOT NULL, + created_at INTEGER NOT NULL, + concluded_at INTEGER, + conclusion TEXT, + convergence_score REAL, + output_path TEXT, + notifications TEXT + ); + INSERT OR IGNORE INTO discussions_new + (id, topic, context, status, max_rounds, current_round, + speech_order, created_by, created_at, concluded_at, + conclusion, convergence_score, output_path, notifications) + SELECT id, topic, context, status, max_rounds, current_round, + speech_order, created_by, created_at, concluded_at, + conclusion, convergence_score, output_path, notifications + FROM discussions; + DROP TABLE discussions; + ALTER TABLE discussions_new RENAME TO discussions; + """) + finally: + conn.execute(f"PRAGMA foreign_keys={'ON' if fk else 'OFF'}") + + conn.executescript(""" + CREATE TABLE IF NOT EXISTS dispatches ( + id TEXT PRIMARY KEY, + discussion_id TEXT NOT NULL, + mode TEXT NOT NULL DEFAULT 'federated' + CHECK(mode IN ('managed', 'federated')), + coordinator_agent_id TEXT NOT NULL, + start_policy TEXT DEFAULT 'quorum', + min_accepts INTEGER DEFAULT 1, + timeout_seconds INTEGER DEFAULT 60, + status TEXT DEFAULT 'pending' + CHECK(status IN ('pending', 'active', 'completed', 'cancelled', 'timeout')), + idempotency_key TEXT UNIQUE, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + started_at INTEGER, + completed_at INTEGER, + metadata TEXT, + FOREIGN KEY (discussion_id) REFERENCES discussions(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_dispatches_discussion + ON dispatches(discussion_id, status); + CREATE INDEX IF NOT EXISTS idx_dispatches_coordinator + ON dispatches(coordinator_agent_id); + + CREATE TABLE IF NOT EXISTS summons ( + id TEXT PRIMARY KEY, + dispatch_id TEXT, + discussion_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + role TEXT, + perspective TEXT, + required_skill TEXT, + status TEXT DEFAULT 'pending' + CHECK(status IN ('pending', 'delivered', 'accepted', 'declined', 'timeout', 'failed')), + invited_by TEXT NOT NULL, + transport TEXT, + endpoint TEXT, + delivery_result TEXT, + idempotency_key TEXT UNIQUE, + created_at INTEGER NOT NULL, + delivered_at INTEGER, + responded_at INTEGER, + expires_at INTEGER, + metadata TEXT, + FOREIGN KEY (dispatch_id) REFERENCES dispatches(id) ON DELETE CASCADE, + FOREIGN KEY (discussion_id) REFERENCES discussions(id) ON DELETE CASCADE, + UNIQUE(discussion_id, agent_id) + ); + CREATE INDEX IF NOT EXISTS idx_summons_agent + ON summons(agent_id, status); + CREATE INDEX IF NOT EXISTS idx_summons_dispatch + ON summons(dispatch_id, status); + CREATE INDEX IF NOT EXISTS idx_summons_timeout + ON summons(status, expires_at); + + CREATE TABLE IF NOT EXISTS summon_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + summon_id TEXT, + dispatch_id TEXT, + agent_id TEXT, + event TEXT NOT NULL, + payload TEXT, + created_at INTEGER NOT NULL, + FOREIGN KEY (summon_id) REFERENCES summons(id) ON DELETE CASCADE, + FOREIGN KEY (dispatch_id) REFERENCES dispatches(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_summon_events_summon + ON summon_events(summon_id, created_at); + """) + + +def _migrate_v3_to_v4(conn: sqlite3.Connection) -> None: + """Add query indexes for dispatch filtering, summon expiry, and inbox sync.""" + conn.executescript(""" + CREATE INDEX IF NOT EXISTS idx_dispatches_coordinator + ON dispatches(coordinator_agent_id); + CREATE INDEX IF NOT EXISTS idx_summons_timeout + ON summons(status, expires_at); + CREATE INDEX IF NOT EXISTS idx_inbox_discussion + ON agent_inbox(discussion_id, read_at); + """) + + _MIGRATIONS: list[Callable[[sqlite3.Connection], None]] = [ _migrate_v0_to_v1, _migrate_v1_to_v2, + _migrate_v2_to_v3, + _migrate_v3_to_v4, ] CURRENT_SCHEMA_VERSION = len(_MIGRATIONS) diff --git a/src/roundtable/web/i18n.js b/src/roundtable/web/i18n.js index 731ffb3..d2df0c9 100644 --- a/src/roundtable/web/i18n.js +++ b/src/roundtable/web/i18n.js @@ -13,6 +13,7 @@ 'zh-CN': { // Header status statusWaiting: '等待中', + statusAssembling: '召集中', statusActive: '讨论中', statusConcluded: '已结束', @@ -25,6 +26,20 @@ waitingTitle: '讨论即将开始…', waitingSubtitle: '发言将会实时显示在这里', + // Dispatch + dispatchAssembling: '正在召集参与 Agent', + dispatchStatus: 'Agent 召集状态', + dispatchReady: '已就绪', + dispatchWaiting: '等待中', + dispatchAccepted: '已接受', + dispatchPending: '待响应', + dispatchDeclined: '已拒绝', + dispatchFailed: '异常', + dispatchModeManaged: '托管', + dispatchModeFederated: '联邦', + dispatchMinAccepts: '至少 {count} 位接受', + dispatchAgents: '{accepted}/{total} 已接受 · {waiting} 待响应 · {failed} 异常', + // Participants participants: '参与者', participantsCount: '位参与者', @@ -119,6 +134,7 @@ 'en-US': { // Header status statusWaiting: 'Waiting', + statusAssembling: 'Assembling', statusActive: 'Active', statusConcluded: 'Concluded', @@ -131,6 +147,20 @@ waitingTitle: 'Discussion starting soon…', waitingSubtitle: 'Speeches will appear here in real-time', + // Dispatch + dispatchAssembling: 'Summoning agents', + dispatchStatus: 'Agent dispatch status', + dispatchReady: 'Ready', + dispatchWaiting: 'Waiting', + dispatchAccepted: 'Accepted', + dispatchPending: 'Pending', + dispatchDeclined: 'Declined', + dispatchFailed: 'Issue', + dispatchModeManaged: 'Managed', + dispatchModeFederated: 'Federated', + dispatchMinAccepts: 'min {count} accepts', + dispatchAgents: '{accepted}/{total} accepted · {waiting} pending · {failed} issues', + // Participants participants: 'Participants', participantsCount: 'participants', diff --git a/src/roundtable/web/index.html b/src/roundtable/web/index.html index 4b200a3..7d55af9 100644 --- a/src/roundtable/web/index.html +++ b/src/roundtable/web/index.html @@ -90,6 +90,7 @@
发言将会实时显示在这里
+