Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,74 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => {
expect(mockReleaseClient).toHaveBeenCalledWith(client);
});

it("recovers empty final realtime consult events from chat history", async () => {
let gatewayHandler: ((payload: unknown) => void) | null = null;
const client = {
realtimeClientToolCall: vi.fn().mockResolvedValue({ ok: true, runId: "run_1" }),
realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }),
chatHistory: vi.fn().mockResolvedValue({
messages: [
{
role: "user",
content: [{ type: "text", text: "Inspect this repo" }],
},
{
role: "assistant",
idempotencyKey: "run_1",
content: [{ type: "text", text: "Recovered from the durable transcript." }],
},
],
}),
on: vi.fn((event: string, handler: (payload: unknown) => void) => {
if (event === "*") gatewayHandler = handler;
}),
off: vi.fn(),
};
mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" });
mockGetGatewayClientForRuntime.mockResolvedValue(client);

const responsePromise = POST(
new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", {
method: "POST",
body: JSON.stringify({
action: "toolCall",
relaySessionId: "relay_1",
sessionKey: "main",
callId: "call_1",
name: "openclaw_agent_consult",
args: { prompt: "Inspect this repo" },
}),
}),
{ params: Promise.resolve({ id: "rt_1" }) },
);

await vi.waitFor(() => expect(gatewayHandler).toBeTypeOf("function"));

(gatewayHandler as ((payload: unknown) => void) | null)?.({
event: "chat",
runId: "run_1",
state: "final",
message: { role: "assistant", content: [] },
});

const response = await responsePromise;
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual({
result: {
delegated: true,
runId: "run_1",
finalText: "Recovered from the durable transcript.",
result: { ok: true },
},
});
expect(client.chatHistory).toHaveBeenCalledWith({ sessionKey: "main", limit: 25 });
expect(client.realtimeRelayToolResult).toHaveBeenNthCalledWith(2, {
relaySessionId: "relay_1",
callId: "call_1",
result: { text: "Recovered from the durable transcript." },
});
});

it("rejects invalid relay actions before calling the gateway", async () => {
mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" });

Expand Down
233 changes: 220 additions & 13 deletions src/app/api/runtimes/[id]/talk/realtime/relay/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ import { buildRuntimeReadWhere, getAgentAccessContext } from "@/lib/agent-access
import { canAccessChatSession } from "@/lib/chat-session-access";
import { publishChatProgressEvent } from "@/lib/chat-pubsub";
import { persistChatProgressEvent } from "@/lib/chat-session-events";
import { selectRecoveredAssistantText } from "@/lib/chat-recovery";
import { getGatewayClientForRuntime, holdClient, releaseClient } from "@/lib/gateway-chat-pool";
import type { GatewayClient } from "@/lib/gateway-client";

export const dynamic = "force-dynamic";
export const maxDuration = 120;
export const maxDuration = 620;

const REALTIME_TOOL_RESULT_TIMEOUT_MS = 10 * 60_000;
const REALTIME_HISTORY_POLL_INTERVAL_MS = 5_000;
const REALTIME_EMPTY_FINAL_HISTORY_GRACE_MS = 30_000;

type RelayAction = "audio" | "cancelOutput" | "mark" | "toolCall" | "toolResult" | "stop";

Expand Down Expand Up @@ -172,7 +177,13 @@ async function runRealtimeToolCall(
options: { willContinue: true },
});

const text = await waitForChatFinal(client, runId, audit);
const text = await waitForChatFinal({
client,
runId,
sessionKey: params.sessionKey,
args: params.args,
audit,
});
const result = await client.realtimeRelayToolResult({
relaySessionId: params.relaySessionId,
callId: params.callId,
Expand Down Expand Up @@ -203,24 +214,92 @@ function buildRealtimeToolWorkingResult() {
};
}

function waitForChatFinal(
client: GatewayClient,
runId: string,
audit: RealtimeAuditPublisher,
timeoutMs = 110_000,
) {
function waitForChatFinal(params: {
client: GatewayClient;
runId: string;
sessionKey: string;
args: unknown;
audit: RealtimeAuditPublisher;
timeoutMs?: number;
}) {
const {
client,
runId,
sessionKey,
args,
audit,
timeoutMs = REALTIME_TOOL_RESULT_TIMEOUT_MS,
} = params;

return new Promise<string>((resolve, reject) => {
let settled = false;
let historyPollInFlight = false;
let historyPollTimeout: ReturnType<typeof setTimeout> | null = null;
let emptyFinalDeadlineMs: number | null = null;

const timer = setTimeout(() => {
cleanup();
void audit.publish("run_error", { error: "Timed out waiting for OpenClaw realtime tool result" });
reject(new Error("Timed out waiting for OpenClaw realtime tool result"));
void recoverFromHistory("timeout").then((recovered) => {
if (recovered || settled) return;
cleanup();
void audit.publish("run_error", { error: "Timed out waiting for OpenClaw realtime tool result" });
reject(new Error("Timed out waiting for OpenClaw realtime tool result"));
});
}, timeoutMs);

const cleanup = () => {
settled = true;
clearTimeout(timer);
if (historyPollTimeout) clearTimeout(historyPollTimeout);
historyPollTimeout = null;
client.off("*", onEvent);
};

const resolveWithText = (text: string) => {
cleanup();
resolve(text);
};

const maybeResolveEmptyFinal = () => {
if (settled || emptyFinalDeadlineMs === null || Date.now() < emptyFinalDeadlineMs) return false;
resolveWithText("OpenClaw completed without returning text.");
return true;
};

const scheduleHistoryPoll = () => {
if (settled || historyPollTimeout) return;
historyPollTimeout = setTimeout(() => {
historyPollTimeout = null;
void recoverFromHistory("poll");
}, REALTIME_HISTORY_POLL_INTERVAL_MS);
};

const recoverFromHistory = async (reason: string) => {
if (settled || historyPollInFlight) return "";
historyPollInFlight = true;
try {
const recovered = await recoverRealtimeConsultTextFromHistory({
client,
runId,
sessionKey,
args,
});
if (recovered && !settled) {
resolveWithText(recovered);
return recovered;
}
if (!maybeResolveEmptyFinal()) scheduleHistoryPoll();
return "";
} catch (error) {
if (reason !== "poll") {
console.error(`[api/realtime/relay] Failed to recover realtime consult text from history (${reason}):`, error);
}
if (!maybeResolveEmptyFinal()) scheduleHistoryPoll();
return "";
} finally {
historyPollInFlight = false;
}
};

const onEvent = (payload: unknown) => {
const event = asRecord(payload);
if (!event) return;
Expand All @@ -233,8 +312,12 @@ function waitForChatFinal(
const state = firstString(event.state, event.status)?.toLowerCase();
if (state === "final" || state === "complete" || state === "completed") {
const text = extractText(event.message) || extractText(event);
cleanup();
resolve(text || "OpenClaw completed without returning text.");
if (text) {
resolveWithText(text);
return;
}
emptyFinalDeadlineMs ??= Date.now() + REALTIME_EMPTY_FINAL_HISTORY_GRACE_MS;
void recoverFromHistory("empty-final");
return;
}

Expand All @@ -248,6 +331,7 @@ function waitForChatFinal(
};

client.on("*", onEvent);
scheduleHistoryPoll();
});
}

Expand Down Expand Up @@ -318,6 +402,129 @@ async function resolveAuditSession(request: Request, sessionKey: string) {
return null;
}

async function recoverRealtimeConsultTextFromHistory(params: {
client: Pick<GatewayClient, "chatHistory">;
runId: string;
sessionKey: string;
args: unknown;
}) {
const result = await params.client.chatHistory({ sessionKey: params.sessionKey, limit: 25 });
const messages = extractHistoryMessages(result);
if (messages.length === 0) return "";

const byRunId = selectHistoryAssistantByRunId(messages, params.runId);
if (byRunId) return byRunId;

const consultMessage = buildRealtimeConsultChatMessage(params.args);
if (consultMessage) {
const recovered = selectRecoveredAssistantText({
messages,
currentUserContents: [consultMessage],
});
if (recovered) return recovered;
}

const question = readRealtimeConsultQuestion(params.args);
return question ? selectAssistantAfterMatchingQuestion(messages, question) : "";
}

type RealtimeHistoryMessage = {
role: string | null;
content: string;
idempotencyKey: string | null;
};

function extractHistoryMessages(result: unknown): RealtimeHistoryMessage[] {
const record = asRecord(result);
const rawMessages = Array.isArray(record?.messages)
? record.messages
: Array.isArray(record?.items)
? record.items
: Array.isArray(result)
? result
: [];

return rawMessages
.map((item) => {
const message = asRecord(item);
const role = firstString(message?.role, message?.type, message?.author)?.toLowerCase() ?? null;
const content = extractText(message?.content ?? message?.message ?? message);
const idempotencyKey = firstString(message?.idempotencyKey, message?.idempotency_key) ?? null;
return { role, content, idempotencyKey };
})
.filter((message) => message.content);
}

function selectHistoryAssistantByRunId(messages: RealtimeHistoryMessage[], runId: string) {
const recovered = messages.findLast((message) =>
message.role === "assistant" &&
message.content.trim() &&
Boolean(
message.idempotencyKey === runId ||
message.idempotencyKey?.startsWith(`${runId}:`)
)
);
return recovered?.content ?? "";
}

function buildRealtimeConsultChatMessage(args: unknown) {
const record = readRealtimeConsultArgs(args);
const question = readRealtimeConsultQuestion(record);
if (!question) return "";

return [
question,
firstString(record?.context) ? `Context:\n${firstString(record?.context)}` : null,
firstString(record?.responseStyle) ? `Spoken style:\n${firstString(record?.responseStyle)}` : null,
].filter(Boolean).join("\n\n");
}

function readRealtimeConsultQuestion(args: unknown) {
const record = readRealtimeConsultArgs(args);
return firstString(record?.question, record?.prompt, record?.query, record?.task);
}

function readRealtimeConsultArgs(args: unknown) {
if (typeof args === "string") {
try {
const parsed = JSON.parse(args);
const record = asRecord(parsed);
return record ?? { question: args };
} catch {
return { question: args };
}
}
return asRecord(args);
}

function selectAssistantAfterMatchingQuestion(messages: RealtimeHistoryMessage[], question: string) {
const normalizedQuestion = normalizeHistoryText(question);
if (normalizedQuestion.length < 8) return "";

const lastMatchingUserIndex = messages.findLastIndex((message) =>
message.role === "user" &&
historyTextMatchesQuestion(message.content, normalizedQuestion)
);
if (lastMatchingUserIndex < 0) return "";

const recovered = messages
.slice(lastMatchingUserIndex + 1)
.find((message) => message.role === "assistant" && message.content.trim());
return recovered?.content ?? "";
}

function historyTextMatchesQuestion(content: string, normalizedQuestion: string) {
const normalizedContent = normalizeHistoryText(content);
if (!normalizedContent) return false;
return normalizedContent === normalizedQuestion ||
normalizedContent.includes(normalizedQuestion) ||
(normalizedContent.length >= 8 && normalizedQuestion.includes(normalizedContent));
}

function normalizeHistoryText(value: string) {
return value.trim().replace(/\s+/g, " ").toLowerCase();
}

function readRequiredString(value: unknown, name: string) {
if (typeof value === "string" && value.trim().length > 0) return value.trim();
throw new ValidationError(`${name} is required`);
Expand Down
Loading