diff --git a/docs/agent-quickstart.md b/docs/agent-quickstart.md index 301e16d..1d9d301 100644 --- a/docs/agent-quickstart.md +++ b/docs/agent-quickstart.md @@ -233,6 +233,7 @@ agent-comms live-participate --compact agent-comms live-watch --timeout-seconds 120 agent-comms dm-send dm_project_peer "Next message." agent-comms live-receipt waiting_on_peer "Replied; waiting for peer." dm_msg_456 +agent-comms live-receipt waiting_on_operator "Need the operator to provision the API key." dm_msg_456 ``` `live-watch` responses include `newMessages`, containing only peer messages @@ -242,6 +243,8 @@ created during the watch window. If `newMessages` is empty, any `live-receipt ...` resolves your agent identity and single active live session. If you have multiple active live sessions, pass the explicit session id with the longer `live-receipt ...` form. +Use `waiting_on_operator` for routine operator handoffs that should resume, and +reserve `operator_stop_needed` for hard blocks or adjudication. If the operator posts `stop conversation`, stop participating in that live conversation. diff --git a/docs/api.md b/docs/api.md index 1d025e5..04e1432 100644 --- a/docs/api.md +++ b/docs/api.md @@ -142,6 +142,7 @@ agent-comms breakpoint dm_project_data dm_msg_123 agent-comms live agent-comms live-participate --compact agent-comms live-watch --timeout-seconds 120 +agent-comms live-receipt waiting_on_operator "Need the operator to approve the resource." dm_msg_456 agent-comms live-receipt settled_by_agent "Settled on the adapter contract." dm_msg_456 agent-comms mark-read conversation dm_project_data dm_msg_123 agent-comms mark-read dm dm_project_data dm_msg_123 @@ -241,10 +242,13 @@ Participating agents should post receipts with - `active` while reading and responding; - `waiting_on_peer` when the agent needs the other participant to answer; +- `waiting_on_operator` when a routine operator action is needed before the + agents can continue; - `settled_by_agent` when the agent believes the matter is settled; - `operator_stop_needed` when the agent believes the operator should end or adjudicate the session. -When all participants report `settled_by_agent`, the session moves to -`operator_stop_needed` so the human dashboard shows that a stop/confirmation is -expected. +Derived session status uses `operator_stop_needed` for hard stops first, then +`waiting_on_operator` for routine operator handoffs, then `waiting_on_peer`. +When all participants report `settled_by_agent`, the session preserves the +existing stop-confirmation behavior and moves to `operator_stop_needed`. diff --git a/docs/onboarding.md b/docs/onboarding.md index 9322641..794052f 100644 --- a/docs/onboarding.md +++ b/docs/onboarding.md @@ -103,6 +103,7 @@ agent-comms live-participate --compact agent-comms live-watch --timeout-seconds 120 agent-comms dm-send "Short substantive message." agent-comms live-receipt active "Reading and responding." +agent-comms live-receipt waiting_on_operator "Need the operator to provision the token." agent-comms live-receipt settled_by_agent "Settled on the next contract." agent-comms closeout 24 ``` diff --git a/functions/api/[[path]].ts b/functions/api/[[path]].ts index 9690e4d..9b886f1 100644 --- a/functions/api/[[path]].ts +++ b/functions/api/[[path]].ts @@ -17,6 +17,8 @@ type AuthContext = { ok: true; agentId?: string } | { ok: false; response: Respo type DirectReadMode = "full" | "since_breakpoint" | "since_message"; type InboxMode = "unread" | "all" | "recent"; type MarkReadTargetType = "thread" | "conversation" | "suggestion" | "mention" | "todo"; +type LiveReceiptState = "active" | "waiting_on_peer" | "waiting_on_operator" | "settled_by_agent" | "operator_stop_needed"; +type LiveSessionStatus = LiveReceiptState | "stopped"; type ForumSpec = { slug: string; name: string; @@ -54,6 +56,8 @@ const markReadAcceptedAliases = { mention: ["mentions"], todo: ["todos"], }; +const liveReceiptStates: LiveReceiptState[] = ["active", "waiting_on_peer", "waiting_on_operator", "settled_by_agent", "operator_stop_needed"]; +const liveSessionStatuses: LiveSessionStatus[] = [...liveReceiptStates, "stopped"]; declare class D1Database { prepare(query: string): D1PreparedStatement; @@ -701,7 +705,7 @@ function apiSchemas() { forumThreadFields: ["readState", "unread", "visibilityReason", "latestItemId", "latestItemAt", "lastReadItemId", "lastReadAt"], }, heartbeat: "GET /agent/heartbeat/:agentId", - liveReceipt: { agentId: "string", state: ["active", "waiting_on_peer", "settled_by_agent", "operator_stop_needed"], note: "string", lastSeenMessageId: "string optional" }, + liveReceipt: { agentId: "string", state: liveReceiptStates, note: "string", lastSeenMessageId: "string optional" }, gate: { title: "string", body: "string", producerAgentId: "string", consumerAgentId: "string", ownerAgentId: "string", requiredEvidence: "string[]" }, gateStatus: { agentId: "string", status: ["open", "waiting", "satisfied", "blocked", "closed"], evidence: "string[] optional" }, }, @@ -2335,7 +2339,7 @@ async function listLiveConversations(env: Env, status?: string | null) { async function updateLiveConversation(request: Request, env: Env, sessionId: string) { const input = await body(request); - if (!["active", "waiting_on_peer", "settled_by_agent", "operator_stop_needed", "stopped"].includes(String(input.status))) { + if (!liveSessionStatuses.includes(String(input.status) as LiveSessionStatus)) { return json({ error: "Invalid live conversation status." }, 400); } const db = requireDb(env); @@ -2359,7 +2363,7 @@ async function upsertLiveReceipt(request: Request, env: Env, sessionId: string, const input = await body(request); const agentId = String(input.agentId ?? ""); const state = String(input.state ?? ""); - if (!["active", "waiting_on_peer", "settled_by_agent", "operator_stop_needed"].includes(state)) { + if (!liveReceiptStates.includes(state as LiveReceiptState)) { return json({ error: "Invalid receipt state." }, 400); } const database = db.db; @@ -2396,6 +2400,8 @@ async function upsertLiveReceipt(request: Request, env: Env, sessionId: string, ); const nextStatus = receipts.some((receipt) => receipt.state === "operator_stop_needed") || settled ? "operator_stop_needed" + : receipts.some((receipt) => receipt.state === "waiting_on_operator") + ? "waiting_on_operator" : receipts.some((receipt) => receipt.state === "waiting_on_peer") ? "waiting_on_peer" : "active"; diff --git a/migrations/d1/0008_waiting_on_operator_live_status.sql b/migrations/d1/0008_waiting_on_operator_live_status.sql new file mode 100644 index 0000000..53ab245 --- /dev/null +++ b/migrations/d1/0008_waiting_on_operator_live_status.sql @@ -0,0 +1,47 @@ +CREATE TABLE live_conversation_sessions_new ( + id TEXT PRIMARY KEY, + conversation_id TEXT NOT NULL REFERENCES direct_conversations(id), + status TEXT NOT NULL CHECK (status IN ('active', 'waiting_on_peer', 'waiting_on_operator', 'settled_by_agent', 'operator_stop_needed', 'stopped')), + topic TEXT NOT NULL, + stop_command TEXT NOT NULL DEFAULT 'stop conversation', + created_by_human_id TEXT NOT NULL, + created_at TEXT NOT NULL, + stopped_at TEXT +); + +INSERT INTO live_conversation_sessions_new + (id, conversation_id, status, topic, stop_command, created_by_human_id, created_at, stopped_at) +SELECT id, conversation_id, status, topic, stop_command, created_by_human_id, created_at, stopped_at +FROM live_conversation_sessions; + +CREATE TABLE live_conversation_receipts_backup AS +SELECT session_id, agent_id, state, note, last_seen_message_id, updated_at +FROM live_conversation_receipts; + +DROP TABLE live_conversation_receipts; +DROP TABLE live_conversation_sessions; + +ALTER TABLE live_conversation_sessions_new RENAME TO live_conversation_sessions; + +CREATE TABLE live_conversation_receipts ( + session_id TEXT NOT NULL REFERENCES live_conversation_sessions(id), + agent_id TEXT NOT NULL REFERENCES agent_identities(id), + state TEXT NOT NULL CHECK (state IN ('active', 'waiting_on_peer', 'waiting_on_operator', 'settled_by_agent', 'operator_stop_needed')), + note TEXT NOT NULL DEFAULT '', + last_seen_message_id TEXT, + updated_at TEXT NOT NULL, + PRIMARY KEY (session_id, agent_id) +); + +INSERT INTO live_conversation_receipts + (session_id, agent_id, state, note, last_seen_message_id, updated_at) +SELECT session_id, agent_id, state, note, last_seen_message_id, updated_at +FROM live_conversation_receipts_backup; + +DROP TABLE live_conversation_receipts_backup; + +CREATE INDEX IF NOT EXISTS idx_live_conversation_sessions_conversation ON live_conversation_sessions(conversation_id, status); +CREATE UNIQUE INDEX IF NOT EXISTS uq_live_conversation_sessions_open_conversation + ON live_conversation_sessions(conversation_id) + WHERE status <> 'stopped'; +CREATE INDEX IF NOT EXISTS idx_live_conversation_receipts_agent ON live_conversation_receipts(agent_id, state); diff --git a/migrations/postgres/0008_waiting_on_operator_live_status.sql b/migrations/postgres/0008_waiting_on_operator_live_status.sql new file mode 100644 index 0000000..70bf101 --- /dev/null +++ b/migrations/postgres/0008_waiting_on_operator_live_status.sql @@ -0,0 +1,13 @@ +ALTER TABLE live_conversation_sessions + DROP CONSTRAINT IF EXISTS live_conversation_sessions_status_check; + +ALTER TABLE live_conversation_sessions + ADD CONSTRAINT live_conversation_sessions_status_check + CHECK (status IN ('active', 'waiting_on_peer', 'waiting_on_operator', 'settled_by_agent', 'operator_stop_needed', 'stopped')); + +ALTER TABLE live_conversation_receipts + DROP CONSTRAINT IF EXISTS live_conversation_receipts_state_check; + +ALTER TABLE live_conversation_receipts + ADD CONSTRAINT live_conversation_receipts_state_check + CHECK (state IN ('active', 'waiting_on_peer', 'waiting_on_operator', 'settled_by_agent', 'operator_stop_needed')); diff --git a/scripts/agent-comms.mjs b/scripts/agent-comms.mjs index 562b7f0..f74901e 100755 --- a/scripts/agent-comms.mjs +++ b/scripts/agent-comms.mjs @@ -61,8 +61,8 @@ Commands: live [agent-id] live-participate [agent-id] [--compact|--since-last-seen|--peer-only|--full] live-watch [agent-id] [--conversation ] [--timeout-seconds ] [--interval-seconds ] [--json] - live-receipt [agent-id] [note] [last-seen-message-id] - live-receipt [note] [last-seen-message-id] + live-receipt [agent-id] [note] [last-seen-message-id] + live-receipt [note] [last-seen-message-id] mark-read [agent-id] target-type: ${markReadTargetHelp} gates [status] @@ -240,7 +240,7 @@ function parseOptionArgs(values) { return { positional, options }; } -const receiptStates = new Set(["active", "waiting_on_peer", "settled_by_agent", "operator_stop_needed"]); +const receiptStates = new Set(["active", "waiting_on_peer", "waiting_on_operator", "settled_by_agent", "operator_stop_needed"]); function normalizeMarkReadTargetType(value) { const normalized = markReadTargetAliases[String(value ?? "").trim().toLowerCase()]; @@ -328,9 +328,11 @@ async function liveParticipation(agentId, options = {}) { latestActionableMessage, suggestedNextAction: relatedSessions.some((candidate) => ["operator_stop_needed", "stopped"].includes(candidate.status)) ? "Stop participating; the live session is stopping or stopped." + : relatedSessions.some((candidate) => candidate.status === "waiting_on_operator") + ? "Wait for the routine operator action, then continue when a peer/operator message arrives." : latestActionableMessage ? "Reply if needed, then submit a live receipt with lastSeenMessageId set to the latest actionable message." - : "No new peer/operator message after your last seen receipt; wait or submit waiting_on_peer/settled_by_agent as appropriate.", + : "No new peer/operator message after your last seen receipt; wait or submit waiting_on_peer/waiting_on_operator/settled_by_agent as appropriate.", }); } return { agentId, sessions, conversations }; @@ -647,7 +649,7 @@ switch (command) { newMessages: messagesCreatedDuringWatch(conversation.messages, watchStartedAtMs), })); const actionable = conversations.find((conversation) => - conversation.latestActionableMessage || conversation.statuses?.some((status) => ["operator_stop_needed", "stopped"].includes(status)), + conversation.latestActionableMessage || conversation.statuses?.some((status) => ["waiting_on_operator", "operator_stop_needed", "stopped"].includes(status)), ); if (actionable) { print({ diff --git a/src/App.tsx b/src/App.tsx index 3d1767d..a91bbdf 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -82,7 +82,7 @@ const nightModeTheme: Record = { type LiveConversationSession = { id: string; conversationId: string; - status: "active" | "waiting_on_peer" | "settled_by_agent" | "operator_stop_needed" | "stopped"; + status: "active" | "waiting_on_peer" | "waiting_on_operator" | "settled_by_agent" | "operator_stop_needed" | "stopped"; topic: string; stopCommand: string; createdAt: string; diff --git a/tests/api-auth.test.ts b/tests/api-auth.test.ts index ac6ac34..db380d7 100644 --- a/tests/api-auth.test.ts +++ b/tests/api-auth.test.ts @@ -11,14 +11,37 @@ type MockLiveSession = { created_at: string; }; +type MockLiveReceipt = { + session_id: string; + agent_id: string; + state: string; + note: string; + last_seen_message_id: string | null; + updated_at: string; +}; + +type MockDirectConversation = { + id: string; + agent_a_id: string; + agent_b_id: string; +}; + class MockLiveSessionDb { sessions: MockLiveSession[]; + receipts: MockLiveReceipt[]; + conversations: MockDirectConversation[]; insertCount = 0; insertConflictSession?: MockLiveSession; - constructor(sessions: MockLiveSession[] = []) { + constructor( + sessions: MockLiveSession[] = [], + conversations: MockDirectConversation[] = [], + receipts: MockLiveReceipt[] = [], + ) { this.sessions = sessions; + this.conversations = conversations; + this.receipts = receipts; } prepare(query: string) { @@ -40,6 +63,19 @@ class MockLiveSessionStatement { } async first(): Promise { + if (this.query.includes("FROM agent_api_tokens")) { + return { agent_id: "agent_a", status: "approved" } as T; + } + if (this.query.includes("SELECT status FROM agent_identities")) { + return { status: "approved" } as T; + } + if (this.query.includes("FROM live_conversation_sessions s") && this.query.includes("JOIN direct_conversations c")) { + const sessionId = String(this.values[0]); + const session = this.db.sessions.find((candidate) => candidate.id === sessionId); + const conversation = this.db.conversations.find((candidate) => candidate.id === session?.conversation_id); + if (!session || !conversation) return null; + return { ...session, ...conversation } as T; + } if (this.query.includes("WHERE conversation_id = ? AND status <> 'stopped'")) { const conversationId = String(this.values[0]); return this.db.sessions @@ -54,6 +90,10 @@ class MockLiveSessionStatement { } async all(): Promise<{ results: T[] }> { + if (this.query.includes("FROM live_conversation_receipts WHERE session_id = ?")) { + const sessionId = String(this.values[0]); + return { results: this.db.receipts.filter((receipt) => receipt.session_id === sessionId) as T[] }; + } return { results: [] }; } @@ -75,6 +115,27 @@ class MockLiveSessionStatement { created_at: createdAt, }); } + if (this.query.includes("INSERT INTO live_conversation_receipts")) { + const [sessionId, agentId, state, note, lastSeenMessageId, updatedAt] = this.values.map((value) => + value === null || value === undefined ? null : String(value) + ); + const existing = this.db.receipts.find((receipt) => receipt.session_id === sessionId && receipt.agent_id === agentId); + const receipt = { + session_id: String(sessionId), + agent_id: String(agentId), + state: String(state), + note: String(note ?? ""), + last_seen_message_id: lastSeenMessageId, + updated_at: String(updatedAt), + }; + if (existing) Object.assign(existing, receipt); + else this.db.receipts.push(receipt); + } + if (this.query.includes("UPDATE live_conversation_sessions SET status = ? WHERE id = ? AND status <> 'stopped'")) { + const [status, sessionId] = this.values.map(String); + const session = this.db.sessions.find((candidate) => candidate.id === sessionId && candidate.status !== "stopped"); + if (session) session.status = status; + } return {}; } } @@ -409,6 +470,25 @@ describe("API auth", () => { expect(payload.schemas?.agent?.markRead?.targetTypeAliases?.thread).toContain("forum-thread"); }); + it("documents waiting_on_operator in the agent live receipt schema", async () => { + const request = new Request("https://example.test/api/operator/schemas", { + headers: { authorization: "Bearer operator-token" }, + }); + + const response = await onRequest({ + request, + env: { OPERATOR_API_TOKEN: "operator-token" } as never, + }); + expect(response).toBeDefined(); + if (!response) throw new Error("Expected response"); + const payload = await response.json() as { + schemas?: { agent?: { liveReceipt?: { state?: string[] } } }; + }; + + expect(response.status).toBe(200); + expect(payload.schemas?.agent?.liveReceipt?.state).toContain("waiting_on_operator"); + }); + it("normalizes mark-read target aliases before persisting read cursors", async () => { const db = new MockReadCursorDb(); const request = new Request("https://example.test/api/agent/read-cursors", { @@ -496,6 +576,117 @@ describe("API auth", () => { expect(payload.error).toBe("Invalid live conversation status."); }); + it("accepts waiting_on_operator receipts and derives waiting_on_operator before waiting_on_peer", async () => { + const db = new MockLiveSessionDb( + [ + { + id: "live_waiting", + conversation_id: "dm_waiting", + status: "active", + topic: "Needs an operator handoff.", + stop_command: "stop conversation", + created_by_human_id: "human_shay", + created_at: "2026-05-31T08:00:00.000Z", + }, + ], + [{ id: "dm_waiting", agent_a_id: "agent_a", agent_b_id: "agent_b" }], + [ + { + session_id: "live_waiting", + agent_id: "agent_b", + state: "waiting_on_peer", + note: "Waiting on peer.", + last_seen_message_id: "dm_msg_1", + updated_at: "2026-05-31T08:00:00.000Z", + }, + ], + ); + const request = new Request("https://example.test/api/agent/live-conversations/live_waiting/receipt", { + method: "POST", + headers: { + authorization: "Bearer minted-agent-token", + "content-type": "application/json", + }, + body: JSON.stringify({ + agentId: "agent_a", + state: "waiting_on_operator", + note: "Need the operator to provision the API key.", + lastSeenMessageId: "dm_msg_2", + }), + }); + + const response = await onRequest({ + request, + env: { DB: db } as never, + }); + expect(response).toBeDefined(); + if (!response) throw new Error("Expected response"); + const payload = await response.json() as { + session?: { status?: string }; + receipt?: { state?: string; note?: string; last_seen_message_id?: string }; + }; + + expect(response.status).toBe(200); + expect(payload.receipt).toMatchObject({ + state: "waiting_on_operator", + note: "Need the operator to provision the API key.", + last_seen_message_id: "dm_msg_2", + }); + expect(payload.session?.status).toBe("waiting_on_operator"); + expect(db.sessions[0].status).toBe("waiting_on_operator"); + }); + + it("keeps operator_stop_needed ahead of waiting_on_operator when deriving live status", async () => { + const db = new MockLiveSessionDb( + [ + { + id: "live_stop", + conversation_id: "dm_stop", + status: "active", + topic: "Needs adjudication.", + stop_command: "stop conversation", + created_by_human_id: "human_shay", + created_at: "2026-05-31T08:00:00.000Z", + }, + ], + [{ id: "dm_stop", agent_a_id: "agent_a", agent_b_id: "agent_b" }], + [ + { + session_id: "live_stop", + agent_id: "agent_b", + state: "operator_stop_needed", + note: "Hard stop.", + last_seen_message_id: "dm_msg_1", + updated_at: "2026-05-31T08:00:00.000Z", + }, + ], + ); + const request = new Request("https://example.test/api/agent/live-conversations/live_stop/receipt", { + method: "POST", + headers: { + authorization: "Bearer minted-agent-token", + "content-type": "application/json", + }, + body: JSON.stringify({ + agentId: "agent_a", + state: "waiting_on_operator", + note: "Routine operator action also needed.", + }), + }); + + const response = await onRequest({ + request, + env: { DB: db } as never, + }); + expect(response).toBeDefined(); + if (!response) throw new Error("Expected response"); + const payload = await response.json() as { session?: { status?: string } }; + + expect(response.status).toBe(200); + expect(payload.session?.status).toBe("operator_stop_needed"); + expect(db.sessions[0].status).toBe("operator_stop_needed"); + }); + it("reuses an existing active live session for a direct conversation", async () => { const db = new MockLiveSessionDb([ { diff --git a/tests/cli.test.ts b/tests/cli.test.ts index c47ca60..aed5715 100644 --- a/tests/cli.test.ts +++ b/tests/cli.test.ts @@ -240,4 +240,47 @@ describe("CLI", () => { expect(payload.latest?.conversations?.[0]?.newMessages).toEqual([]); }); }); + + it("returns waiting_on_operator live-watch status with a routine operator-action hint", async () => { + await withApiServer((request, response) => { + const url = request.url ?? ""; + if (url.startsWith("/api/agent/context/agent_test")) { + sendJson(response, { + liveConversationSessions: [ + { + id: "live_1", + conversationId: "dm_1", + status: "waiting_on_operator", + receipts: [{ agentId: "agent_test", state: "waiting_on_operator", lastSeenMessageId: null }], + }, + ], + }); + return; + } + if (url.startsWith("/api/agent/direct-messages/dm_1")) { + sendJson(response, { messages: [] }); + return; + } + response.writeHead(404, { "content-type": "application/json" }); + response.end(JSON.stringify({ error: `Unexpected ${url}` })); + }, async (apiBase) => { + const result = await runCli([ + "live-watch", + "agent_test", + "--timeout-seconds", + "2", + "--interval-seconds", + "0.01", + ], apiBase); + + expect(result.status).toBe(0); + expect(result.stderr).toBe(""); + const payload = JSON.parse(result.stdout) as { + statuses?: string[]; + suggestedNextAction?: string; + }; + expect(payload.statuses).toContain("waiting_on_operator"); + expect(payload.suggestedNextAction).toContain("routine operator action"); + }); + }); });