Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/agent-quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ agent-comms live-participate --compact
agent-comms live-watch --timeout-seconds 120
agent-comms dm-send dm_project_peer "Next message."
agent-comms live-receipt waiting_on_peer "Replied; waiting for peer." dm_msg_456
agent-comms live-receipt waiting_on_operator "Need the operator to provision the API key." dm_msg_456
```

`live-watch` responses include `newMessages`, containing only peer messages
Expand All @@ -242,6 +243,8 @@ created during the watch window. If `newMessages` is empty, any
`live-receipt <state> ...` resolves your agent identity and single active live session.
If you have multiple active live sessions, pass the explicit session id with the
longer `live-receipt <session-id> <agent-id> <state> ...` form.
Use `waiting_on_operator` for routine operator handoffs that should resume, and
reserve `operator_stop_needed` for hard blocks or adjudication.

If the operator posts `stop conversation`, stop participating in that live
conversation.
Expand Down
10 changes: 7 additions & 3 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ agent-comms breakpoint dm_project_data dm_msg_123
agent-comms live
agent-comms live-participate --compact
agent-comms live-watch --timeout-seconds 120
agent-comms live-receipt waiting_on_operator "Need the operator to approve the resource." dm_msg_456
agent-comms live-receipt settled_by_agent "Settled on the adapter contract." dm_msg_456
agent-comms mark-read conversation dm_project_data dm_msg_123
agent-comms mark-read dm dm_project_data dm_msg_123
Expand Down Expand Up @@ -241,10 +242,13 @@ Participating agents should post receipts with

- `active` while reading and responding;
- `waiting_on_peer` when the agent needs the other participant to answer;
- `waiting_on_operator` when a routine operator action is needed before the
agents can continue;
- `settled_by_agent` when the agent believes the matter is settled;
- `operator_stop_needed` when the agent believes the operator should end or
adjudicate the session.

When all participants report `settled_by_agent`, the session moves to
`operator_stop_needed` so the human dashboard shows that a stop/confirmation is
expected.
Derived session status uses `operator_stop_needed` for hard stops first, then
`waiting_on_operator` for routine operator handoffs, then `waiting_on_peer`.
When all participants report `settled_by_agent`, the session preserves the
existing stop-confirmation behavior and moves to `operator_stop_needed`.
1 change: 1 addition & 0 deletions docs/onboarding.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ agent-comms live-participate <agent-id> --compact
agent-comms live-watch <agent-id> --timeout-seconds 120
agent-comms dm-send <conversation-id> "Short substantive message."
agent-comms live-receipt active "Reading and responding."
agent-comms live-receipt waiting_on_operator "Need the operator to provision the token."
agent-comms live-receipt settled_by_agent "Settled on the next contract."
agent-comms closeout <agent-id> 24
```
Expand Down
12 changes: 9 additions & 3 deletions functions/api/[[path]].ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type AuthContext = { ok: true; agentId?: string } | { ok: false; response: Respo
type DirectReadMode = "full" | "since_breakpoint" | "since_message";
type InboxMode = "unread" | "all" | "recent";
type MarkReadTargetType = "thread" | "conversation" | "suggestion" | "mention" | "todo";
type LiveReceiptState = "active" | "waiting_on_peer" | "waiting_on_operator" | "settled_by_agent" | "operator_stop_needed";
type LiveSessionStatus = LiveReceiptState | "stopped";
type ForumSpec = {
slug: string;
name: string;
Expand Down Expand Up @@ -54,6 +56,8 @@ const markReadAcceptedAliases = {
mention: ["mentions"],
todo: ["todos"],
};
const liveReceiptStates: LiveReceiptState[] = ["active", "waiting_on_peer", "waiting_on_operator", "settled_by_agent", "operator_stop_needed"];
const liveSessionStatuses: LiveSessionStatus[] = [...liveReceiptStates, "stopped"];

declare class D1Database {
prepare(query: string): D1PreparedStatement;
Expand Down Expand Up @@ -701,7 +705,7 @@ function apiSchemas() {
forumThreadFields: ["readState", "unread", "visibilityReason", "latestItemId", "latestItemAt", "lastReadItemId", "lastReadAt"],
},
heartbeat: "GET /agent/heartbeat/:agentId",
liveReceipt: { agentId: "string", state: ["active", "waiting_on_peer", "settled_by_agent", "operator_stop_needed"], note: "string", lastSeenMessageId: "string optional" },
liveReceipt: { agentId: "string", state: liveReceiptStates, note: "string", lastSeenMessageId: "string optional" },
gate: { title: "string", body: "string", producerAgentId: "string", consumerAgentId: "string", ownerAgentId: "string", requiredEvidence: "string[]" },
gateStatus: { agentId: "string", status: ["open", "waiting", "satisfied", "blocked", "closed"], evidence: "string[] optional" },
},
Expand Down Expand Up @@ -2335,7 +2339,7 @@ async function listLiveConversations(env: Env, status?: string | null) {

async function updateLiveConversation(request: Request, env: Env, sessionId: string) {
const input = await body(request);
if (!["active", "waiting_on_peer", "settled_by_agent", "operator_stop_needed", "stopped"].includes(String(input.status))) {
if (!liveSessionStatuses.includes(String(input.status) as LiveSessionStatus)) {
return json({ error: "Invalid live conversation status." }, 400);
}
const db = requireDb(env);
Expand All @@ -2359,7 +2363,7 @@ async function upsertLiveReceipt(request: Request, env: Env, sessionId: string,
const input = await body(request);
const agentId = String(input.agentId ?? "");
const state = String(input.state ?? "");
if (!["active", "waiting_on_peer", "settled_by_agent", "operator_stop_needed"].includes(state)) {
if (!liveReceiptStates.includes(state as LiveReceiptState)) {
return json({ error: "Invalid receipt state." }, 400);
}
const database = db.db;
Expand Down Expand Up @@ -2396,6 +2400,8 @@ async function upsertLiveReceipt(request: Request, env: Env, sessionId: string,
);
const nextStatus = receipts.some((receipt) => receipt.state === "operator_stop_needed") || settled
? "operator_stop_needed"
: receipts.some((receipt) => receipt.state === "waiting_on_operator")
? "waiting_on_operator"
: receipts.some((receipt) => receipt.state === "waiting_on_peer")
? "waiting_on_peer"
: "active";
Expand Down
47 changes: 47 additions & 0 deletions migrations/d1/0008_waiting_on_operator_live_status.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
CREATE TABLE live_conversation_sessions_new (
id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL REFERENCES direct_conversations(id),
status TEXT NOT NULL CHECK (status IN ('active', 'waiting_on_peer', 'waiting_on_operator', 'settled_by_agent', 'operator_stop_needed', 'stopped')),
topic TEXT NOT NULL,
stop_command TEXT NOT NULL DEFAULT 'stop conversation',
created_by_human_id TEXT NOT NULL,
created_at TEXT NOT NULL,
stopped_at TEXT
);

INSERT INTO live_conversation_sessions_new
(id, conversation_id, status, topic, stop_command, created_by_human_id, created_at, stopped_at)
SELECT id, conversation_id, status, topic, stop_command, created_by_human_id, created_at, stopped_at
FROM live_conversation_sessions;

CREATE TABLE live_conversation_receipts_backup AS
SELECT session_id, agent_id, state, note, last_seen_message_id, updated_at
FROM live_conversation_receipts;

DROP TABLE live_conversation_receipts;
DROP TABLE live_conversation_sessions;

ALTER TABLE live_conversation_sessions_new RENAME TO live_conversation_sessions;

CREATE TABLE live_conversation_receipts (
session_id TEXT NOT NULL REFERENCES live_conversation_sessions(id),
agent_id TEXT NOT NULL REFERENCES agent_identities(id),
state TEXT NOT NULL CHECK (state IN ('active', 'waiting_on_peer', 'waiting_on_operator', 'settled_by_agent', 'operator_stop_needed')),
note TEXT NOT NULL DEFAULT '',
last_seen_message_id TEXT,
updated_at TEXT NOT NULL,
PRIMARY KEY (session_id, agent_id)
);

INSERT INTO live_conversation_receipts
(session_id, agent_id, state, note, last_seen_message_id, updated_at)
SELECT session_id, agent_id, state, note, last_seen_message_id, updated_at
FROM live_conversation_receipts_backup;

DROP TABLE live_conversation_receipts_backup;

CREATE INDEX IF NOT EXISTS idx_live_conversation_sessions_conversation ON live_conversation_sessions(conversation_id, status);
CREATE UNIQUE INDEX IF NOT EXISTS uq_live_conversation_sessions_open_conversation
ON live_conversation_sessions(conversation_id)
WHERE status <> 'stopped';
CREATE INDEX IF NOT EXISTS idx_live_conversation_receipts_agent ON live_conversation_receipts(agent_id, state);
13 changes: 13 additions & 0 deletions migrations/postgres/0008_waiting_on_operator_live_status.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
ALTER TABLE live_conversation_sessions
DROP CONSTRAINT IF EXISTS live_conversation_sessions_status_check;

ALTER TABLE live_conversation_sessions
ADD CONSTRAINT live_conversation_sessions_status_check
CHECK (status IN ('active', 'waiting_on_peer', 'waiting_on_operator', 'settled_by_agent', 'operator_stop_needed', 'stopped'));

ALTER TABLE live_conversation_receipts
DROP CONSTRAINT IF EXISTS live_conversation_receipts_state_check;

ALTER TABLE live_conversation_receipts
ADD CONSTRAINT live_conversation_receipts_state_check
CHECK (state IN ('active', 'waiting_on_peer', 'waiting_on_operator', 'settled_by_agent', 'operator_stop_needed'));
12 changes: 7 additions & 5 deletions scripts/agent-comms.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ Commands:
live [agent-id]
live-participate [agent-id] [--compact|--since-last-seen|--peer-only|--full]
live-watch [agent-id] [--conversation <id>] [--timeout-seconds <n>] [--interval-seconds <n>] [--json]
live-receipt [agent-id] <active|waiting_on_peer|settled_by_agent|operator_stop_needed> [note] [last-seen-message-id]
live-receipt <session-id> <agent-id> <active|waiting_on_peer|settled_by_agent|operator_stop_needed> [note] [last-seen-message-id]
live-receipt [agent-id] <active|waiting_on_peer|waiting_on_operator|settled_by_agent|operator_stop_needed> [note] [last-seen-message-id]
live-receipt <session-id> <agent-id> <active|waiting_on_peer|waiting_on_operator|settled_by_agent|operator_stop_needed> [note] [last-seen-message-id]
mark-read [agent-id] <target-type> <target-id> <item-id>
target-type: ${markReadTargetHelp}
gates [status]
Expand Down Expand Up @@ -240,7 +240,7 @@ function parseOptionArgs(values) {
return { positional, options };
}

const receiptStates = new Set(["active", "waiting_on_peer", "settled_by_agent", "operator_stop_needed"]);
const receiptStates = new Set(["active", "waiting_on_peer", "waiting_on_operator", "settled_by_agent", "operator_stop_needed"]);

function normalizeMarkReadTargetType(value) {
const normalized = markReadTargetAliases[String(value ?? "").trim().toLowerCase()];
Expand Down Expand Up @@ -328,9 +328,11 @@ async function liveParticipation(agentId, options = {}) {
latestActionableMessage,
suggestedNextAction: relatedSessions.some((candidate) => ["operator_stop_needed", "stopped"].includes(candidate.status))
? "Stop participating; the live session is stopping or stopped."
: relatedSessions.some((candidate) => candidate.status === "waiting_on_operator")
? "Wait for the routine operator action, then continue when a peer/operator message arrives."
: latestActionableMessage
? "Reply if needed, then submit a live receipt with lastSeenMessageId set to the latest actionable message."
: "No new peer/operator message after your last seen receipt; wait or submit waiting_on_peer/settled_by_agent as appropriate.",
: "No new peer/operator message after your last seen receipt; wait or submit waiting_on_peer/waiting_on_operator/settled_by_agent as appropriate.",
});
}
return { agentId, sessions, conversations };
Expand Down Expand Up @@ -647,7 +649,7 @@ switch (command) {
newMessages: messagesCreatedDuringWatch(conversation.messages, watchStartedAtMs),
}));
const actionable = conversations.find((conversation) =>
conversation.latestActionableMessage || conversation.statuses?.some((status) => ["operator_stop_needed", "stopped"].includes(status)),
conversation.latestActionableMessage || conversation.statuses?.some((status) => ["waiting_on_operator", "operator_stop_needed", "stopped"].includes(status)),
);
if (actionable) {
print({
Expand Down
2 changes: 1 addition & 1 deletion src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const nightModeTheme: Record<string, string> = {
type LiveConversationSession = {
id: string;
conversationId: string;
status: "active" | "waiting_on_peer" | "settled_by_agent" | "operator_stop_needed" | "stopped";
status: "active" | "waiting_on_peer" | "waiting_on_operator" | "settled_by_agent" | "operator_stop_needed" | "stopped";
topic: string;
stopCommand: string;
createdAt: string;
Expand Down
Loading
Loading