From a9b055718b675cd06aa6371b2a9926b5ccd90218 Mon Sep 17 00:00:00 2001 From: Roger Chappel Date: Mon, 1 Jun 2026 10:25:50 +1000 Subject: [PATCH] fix: extend realtime openclaw consult wait --- .../[id]/talk/realtime/relay/route.test.ts | 68 +++++ .../[id]/talk/realtime/relay/route.ts | 233 +++++++++++++++++- 2 files changed, 288 insertions(+), 13 deletions(-) diff --git a/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts b/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts index 00604a9..37e9fd1 100644 --- a/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts +++ b/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts @@ -400,6 +400,74 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => { expect(mockReleaseClient).toHaveBeenCalledWith(client); }); + it("recovers empty final realtime consult events from chat history", async () => { + let gatewayHandler: ((payload: unknown) => void) | null = null; + const client = { + realtimeClientToolCall: vi.fn().mockResolvedValue({ ok: true, runId: "run_1" }), + realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }), + chatHistory: vi.fn().mockResolvedValue({ + messages: [ + { + role: "user", + content: [{ type: "text", text: "Inspect this repo" }], + }, + { + role: "assistant", + idempotencyKey: "run_1", + content: [{ type: "text", text: "Recovered from the durable transcript." }], + }, + ], + }), + on: vi.fn((event: string, handler: (payload: unknown) => void) => { + if (event === "*") gatewayHandler = handler; + }), + off: vi.fn(), + }; + mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" }); + mockGetGatewayClientForRuntime.mockResolvedValue(client); + + const responsePromise = POST( + new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", { + method: "POST", + body: JSON.stringify({ + action: "toolCall", + relaySessionId: "relay_1", + sessionKey: "main", + callId: "call_1", + name: "openclaw_agent_consult", + args: { prompt: "Inspect this repo" }, + }), + }), + { params: Promise.resolve({ id: "rt_1" }) }, + ); + + await vi.waitFor(() => expect(gatewayHandler).toBeTypeOf("function")); + + (gatewayHandler as ((payload: unknown) => void) | null)?.({ + event: "chat", + runId: "run_1", + state: "final", + message: { role: "assistant", content: [] }, + }); + + const response = await responsePromise; + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual({ + result: { + delegated: true, + runId: "run_1", + finalText: "Recovered from the durable transcript.", + result: { ok: true }, + }, + }); + expect(client.chatHistory).toHaveBeenCalledWith({ sessionKey: "main", limit: 25 }); + expect(client.realtimeRelayToolResult).toHaveBeenNthCalledWith(2, { + relaySessionId: "relay_1", + callId: "call_1", + result: { text: "Recovered from the durable transcript." }, + }); + }); + it("rejects invalid relay actions before calling the gateway", async () => { mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" }); diff --git a/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts b/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts index 0212002..afdffa6 100644 --- a/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts +++ b/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts @@ -6,11 +6,16 @@ import { buildRuntimeReadWhere, getAgentAccessContext } from "@/lib/agent-access import { canAccessChatSession } from "@/lib/chat-session-access"; import { publishChatProgressEvent } from "@/lib/chat-pubsub"; import { persistChatProgressEvent } from "@/lib/chat-session-events"; +import { selectRecoveredAssistantText } from "@/lib/chat-recovery"; import { getGatewayClientForRuntime, holdClient, releaseClient } from "@/lib/gateway-chat-pool"; import type { GatewayClient } from "@/lib/gateway-client"; export const dynamic = "force-dynamic"; -export const maxDuration = 120; +export const maxDuration = 620; + +const REALTIME_TOOL_RESULT_TIMEOUT_MS = 10 * 60_000; +const REALTIME_HISTORY_POLL_INTERVAL_MS = 5_000; +const REALTIME_EMPTY_FINAL_HISTORY_GRACE_MS = 30_000; type RelayAction = "audio" | "cancelOutput" | "mark" | "toolCall" | "toolResult" | "stop"; @@ -172,7 +177,13 @@ async function runRealtimeToolCall( options: { willContinue: true }, }); - const text = await waitForChatFinal(client, runId, audit); + const text = await waitForChatFinal({ + client, + runId, + sessionKey: params.sessionKey, + args: params.args, + audit, + }); const result = await client.realtimeRelayToolResult({ relaySessionId: params.relaySessionId, callId: params.callId, @@ -203,24 +214,92 @@ function buildRealtimeToolWorkingResult() { }; } -function waitForChatFinal( - client: GatewayClient, - runId: string, - audit: RealtimeAuditPublisher, - timeoutMs = 110_000, -) { +function waitForChatFinal(params: { + client: GatewayClient; + runId: string; + sessionKey: string; + args: unknown; + audit: RealtimeAuditPublisher; + timeoutMs?: number; +}) { + const { + client, + runId, + sessionKey, + args, + audit, + timeoutMs = REALTIME_TOOL_RESULT_TIMEOUT_MS, + } = params; + return new Promise((resolve, reject) => { + let settled = false; + let historyPollInFlight = false; + let historyPollTimeout: ReturnType | null = null; + let emptyFinalDeadlineMs: number | null = null; + const timer = setTimeout(() => { - cleanup(); - void audit.publish("run_error", { error: "Timed out waiting for OpenClaw realtime tool result" }); - reject(new Error("Timed out waiting for OpenClaw realtime tool result")); + void recoverFromHistory("timeout").then((recovered) => { + if (recovered || settled) return; + cleanup(); + void audit.publish("run_error", { error: "Timed out waiting for OpenClaw realtime tool result" }); + reject(new Error("Timed out waiting for OpenClaw realtime tool result")); + }); }, timeoutMs); const cleanup = () => { + settled = true; clearTimeout(timer); + if (historyPollTimeout) clearTimeout(historyPollTimeout); + historyPollTimeout = null; client.off("*", onEvent); }; + const resolveWithText = (text: string) => { + cleanup(); + resolve(text); + }; + + const maybeResolveEmptyFinal = () => { + if (settled || emptyFinalDeadlineMs === null || Date.now() < emptyFinalDeadlineMs) return false; + resolveWithText("OpenClaw completed without returning text."); + return true; + }; + + const scheduleHistoryPoll = () => { + if (settled || historyPollTimeout) return; + historyPollTimeout = setTimeout(() => { + historyPollTimeout = null; + void recoverFromHistory("poll"); + }, REALTIME_HISTORY_POLL_INTERVAL_MS); + }; + + const recoverFromHistory = async (reason: string) => { + if (settled || historyPollInFlight) return ""; + historyPollInFlight = true; + try { + const recovered = await recoverRealtimeConsultTextFromHistory({ + client, + runId, + sessionKey, + args, + }); + if (recovered && !settled) { + resolveWithText(recovered); + return recovered; + } + if (!maybeResolveEmptyFinal()) scheduleHistoryPoll(); + return ""; + } catch (error) { + if (reason !== "poll") { + console.error(`[api/realtime/relay] Failed to recover realtime consult text from history (${reason}):`, error); + } + if (!maybeResolveEmptyFinal()) scheduleHistoryPoll(); + return ""; + } finally { + historyPollInFlight = false; + } + }; + const onEvent = (payload: unknown) => { const event = asRecord(payload); if (!event) return; @@ -233,8 +312,12 @@ function waitForChatFinal( const state = firstString(event.state, event.status)?.toLowerCase(); if (state === "final" || state === "complete" || state === "completed") { const text = extractText(event.message) || extractText(event); - cleanup(); - resolve(text || "OpenClaw completed without returning text."); + if (text) { + resolveWithText(text); + return; + } + emptyFinalDeadlineMs ??= Date.now() + REALTIME_EMPTY_FINAL_HISTORY_GRACE_MS; + void recoverFromHistory("empty-final"); return; } @@ -248,6 +331,7 @@ function waitForChatFinal( }; client.on("*", onEvent); + scheduleHistoryPoll(); }); } @@ -318,6 +402,129 @@ async function resolveAuditSession(request: Request, sessionKey: string) { return null; } +async function recoverRealtimeConsultTextFromHistory(params: { + client: Pick; + runId: string; + sessionKey: string; + args: unknown; +}) { + const result = await params.client.chatHistory({ sessionKey: params.sessionKey, limit: 25 }); + const messages = extractHistoryMessages(result); + if (messages.length === 0) return ""; + + const byRunId = selectHistoryAssistantByRunId(messages, params.runId); + if (byRunId) return byRunId; + + const consultMessage = buildRealtimeConsultChatMessage(params.args); + if (consultMessage) { + const recovered = selectRecoveredAssistantText({ + messages, + currentUserContents: [consultMessage], + }); + if (recovered) return recovered; + } + + const question = readRealtimeConsultQuestion(params.args); + return question ? selectAssistantAfterMatchingQuestion(messages, question) : ""; +} + +type RealtimeHistoryMessage = { + role: string | null; + content: string; + idempotencyKey: string | null; +}; + +function extractHistoryMessages(result: unknown): RealtimeHistoryMessage[] { + const record = asRecord(result); + const rawMessages = Array.isArray(record?.messages) + ? record.messages + : Array.isArray(record?.items) + ? record.items + : Array.isArray(result) + ? result + : []; + + return rawMessages + .map((item) => { + const message = asRecord(item); + const role = firstString(message?.role, message?.type, message?.author)?.toLowerCase() ?? null; + const content = extractText(message?.content ?? message?.message ?? message); + const idempotencyKey = firstString(message?.idempotencyKey, message?.idempotency_key) ?? null; + return { role, content, idempotencyKey }; + }) + .filter((message) => message.content); +} + +function selectHistoryAssistantByRunId(messages: RealtimeHistoryMessage[], runId: string) { + const recovered = messages.findLast((message) => + message.role === "assistant" && + message.content.trim() && + Boolean( + message.idempotencyKey === runId || + message.idempotencyKey?.startsWith(`${runId}:`) + ) + ); + return recovered?.content ?? ""; +} + +function buildRealtimeConsultChatMessage(args: unknown) { + const record = readRealtimeConsultArgs(args); + const question = readRealtimeConsultQuestion(record); + if (!question) return ""; + + return [ + question, + firstString(record?.context) ? `Context:\n${firstString(record?.context)}` : null, + firstString(record?.responseStyle) ? `Spoken style:\n${firstString(record?.responseStyle)}` : null, + ].filter(Boolean).join("\n\n"); +} + +function readRealtimeConsultQuestion(args: unknown) { + const record = readRealtimeConsultArgs(args); + return firstString(record?.question, record?.prompt, record?.query, record?.task); +} + +function readRealtimeConsultArgs(args: unknown) { + if (typeof args === "string") { + try { + const parsed = JSON.parse(args); + const record = asRecord(parsed); + return record ?? { question: args }; + } catch { + return { question: args }; + } + } + return asRecord(args); +} + +function selectAssistantAfterMatchingQuestion(messages: RealtimeHistoryMessage[], question: string) { + const normalizedQuestion = normalizeHistoryText(question); + if (normalizedQuestion.length < 8) return ""; + + const lastMatchingUserIndex = messages.findLastIndex((message) => + message.role === "user" && + historyTextMatchesQuestion(message.content, normalizedQuestion) + ); + if (lastMatchingUserIndex < 0) return ""; + + const recovered = messages + .slice(lastMatchingUserIndex + 1) + .find((message) => message.role === "assistant" && message.content.trim()); + return recovered?.content ?? ""; +} + +function historyTextMatchesQuestion(content: string, normalizedQuestion: string) { + const normalizedContent = normalizeHistoryText(content); + if (!normalizedContent) return false; + return normalizedContent === normalizedQuestion || + normalizedContent.includes(normalizedQuestion) || + (normalizedContent.length >= 8 && normalizedQuestion.includes(normalizedContent)); +} + +function normalizeHistoryText(value: string) { + return value.trim().replace(/\s+/g, " ").toLowerCase(); +} + function readRequiredString(value: unknown, name: string) { if (typeof value === "string" && value.trim().length > 0) return value.trim(); throw new ValidationError(`${name} is required`);