diff --git a/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts b/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts index 4dceeb2..43a6404 100644 --- a/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts +++ b/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts @@ -311,6 +311,8 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => { const response = await responsePromise; expect(response.status).toBe(200); + const body = await response.json(); + expect(body.result.finalText).toBe("The repo is a CrewCMD app."); const persistedEvents = mockPersistChatProgressEvent.mock.calls.map((call) => call[0].payload.event); expect(persistedEvents).toContain("run_started"); expect(persistedEvents).toContain("tool_started"); @@ -334,6 +336,68 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => { })); }); + it("prefers OpenClaw source replies over completion-only final text", async () => { + let gatewayHandler: ((payload: unknown) => void) | null = null; + const client = { + realtimeClientToolCall: vi.fn().mockResolvedValue({ runId: "run_1" }), + realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }), + on: vi.fn((event: string, handler: (payload: unknown) => void) => { + if (event === "*") gatewayHandler = handler; + }), + off: vi.fn(), + }; + mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" }); + mockGetGatewayClientForRuntime.mockResolvedValue(client); + + const responsePromise = POST( + new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", { + method: "POST", + body: JSON.stringify({ + action: "toolCall", + relaySessionId: "relay_1", + sessionKey: "main", + callId: "call_1", + name: "openclaw_agent_consult", + args: { question: "Summarize the README" }, + }), + }), + { params: Promise.resolve({ id: "rt_1" }) }, + ); + + await vi.waitFor(() => expect(gatewayHandler).toBeTypeOf("function")); + + (gatewayHandler as ((payload: unknown) => void) | null)?.({ + event: "chat", + runId: "run_1", + state: "final", + message: { + role: "assistant", + content: [{ type: "text", text: "Done." }], + details: { + sourceReply: { + text: "The README says product-videogen creates product videos from prompts and assets.", + }, + }, + }, + }); + + const response = await responsePromise; + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual({ + result: { + delegated: true, + runId: "run_1", + finalText: "The README says product-videogen creates product videos from prompts and assets.", + result: { ok: true }, + }, + }); + expect(client.realtimeRelayToolResult).toHaveBeenNthCalledWith(2, { + relaySessionId: "relay_1", + callId: "call_1", + result: { result: "The README says product-videogen creates product videos from prompts and assets." }, + }); + }); + it("extracts final realtime consult text from OpenClaw trace artifacts", async () => { let gatewayHandler: ((payload: unknown) => void) | null = null; const client = { @@ -536,6 +600,81 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => { }); }); + it("skips completion-status chat history entries when recovering realtime consult text", async () => { + let gatewayHandler: ((payload: unknown) => void) | null = null; + const client = { + realtimeClientToolCall: vi.fn().mockResolvedValue({ ok: true, runId: "run_1" }), + realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }), + chatHistory: vi.fn().mockResolvedValue({ + messages: [ + { + role: "user", + content: [{ type: "text", text: "Can you summarize the README?" }], + }, + { + role: "assistant", + content: [{ + type: "text", + text: + "I checked, but the result didn’t include the README contents, just a completion status.", + }], + }, + { + role: "assistant", + content: [{ + type: "text", + text: + "The README describes product-videogen as a product video generation workflow with prompt, asset, and rendering steps.", + }], + }, + ], + }), + on: vi.fn((event: string, handler: (payload: unknown) => void) => { + if (event === "*") gatewayHandler = handler; + }), + off: vi.fn(), + }; + mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" }); + mockGetGatewayClientForRuntime.mockResolvedValue(client); + + const responsePromise = POST( + new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", { + method: "POST", + body: JSON.stringify({ + action: "toolCall", + relaySessionId: "relay_1", + sessionKey: "main", + callId: "call_1", + name: "openclaw_agent_consult", + args: { question: "Can you summarize the README?" }, + }), + }), + { params: Promise.resolve({ id: "rt_1" }) }, + ); + + await vi.waitFor(() => expect(gatewayHandler).toBeTypeOf("function")); + + (gatewayHandler as ((payload: unknown) => void) | null)?.({ + event: "chat", + runId: "run_1", + state: "final", + message: { role: "assistant", content: [{ type: "text", text: "Done." }] }, + }); + + const response = await responsePromise; + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual({ + result: { + delegated: true, + runId: "run_1", + finalText: + "The README describes product-videogen as a product video generation workflow with prompt, asset, and rendering steps.", + result: { ok: true }, + }, + }); + expect(client.chatHistory).toHaveBeenCalledWith({ sessionKey: "main", limit: 25 }); + }); + it("rejects invalid relay actions before calling the gateway", async () => { mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" }); diff --git a/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts b/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts index f439562..6fcf12d 100644 --- a/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts +++ b/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts @@ -6,7 +6,7 @@ import { buildRuntimeReadWhere, getAgentAccessContext } from "@/lib/agent-access import { canAccessChatSession } from "@/lib/chat-session-access"; import { publishChatProgressEvent } from "@/lib/chat-pubsub"; import { persistChatProgressEvent } from "@/lib/chat-session-events"; -import { selectRecoveredAssistantText } from "@/lib/chat-recovery"; +import { isAssistantDeliveryPlaceholder, selectRecoveredAssistantText } from "@/lib/chat-recovery"; import { getGatewayClientForRuntime, holdClient, releaseClient } from "@/lib/gateway-chat-pool"; import type { GatewayClient } from "@/lib/gateway-client"; @@ -237,6 +237,7 @@ function waitForChatFinal(params: { let historyPollTimeout: ReturnType | null = null; let deferredFinalText: string | null = null; let deferredFinalDeadlineMs: number | null = null; + let latestSourceReplyText: string | null = null; const timer = setTimeout(() => { void recoverFromHistory("timeout").then((recovered) => { @@ -288,6 +289,10 @@ function waitForChatFinal(params: { resolveWithText(recovered); return recovered; } + if (latestSourceReplyText && !settled) { + resolveWithText(latestSourceReplyText); + return latestSourceReplyText; + } if (!maybeResolveDeferredFinal()) scheduleHistoryPoll(); return ""; } catch (error) { @@ -307,14 +312,21 @@ function waitForChatFinal(params: { const runIds = extractEventRunIds(event); if (!runIds.includes(runId)) return; + const eventSourceReplyText = extractSourceReplyText(event); + if (eventSourceReplyText) latestSourceReplyText = eventSourceReplyText; + const toolProgress = extractToolProgress(event); if (toolProgress) void audit.publish(toolProgress.event, { activeTool: toolProgress.activeTool }); const state = firstString(event.state, event.status)?.toLowerCase(); - if (state === "final" || state === "complete" || state === "completed") { - const text = extractText(event.message) || extractText(event); + if (isRealtimeConsultTerminalResultEvent(event, state, eventSourceReplyText)) { + const text = extractSourceReplyText(event.message) || eventSourceReplyText || extractText(event.message) || extractText(event); if (text) { if (isRealtimeConsultPlaceholderText(text)) { + if (latestSourceReplyText) { + resolveWithText(latestSourceReplyText); + return; + } deferredFinalText = text; deferredFinalDeadlineMs ??= Date.now() + REALTIME_EMPTY_FINAL_HISTORY_GRACE_MS; void recoverFromHistory("placeholder-final"); @@ -429,7 +441,7 @@ async function recoverRealtimeConsultTextFromHistory(params: { messages, currentUserContents: [consultMessage], }); - if (recovered) return recovered; + if (recovered && !isRealtimeConsultPlaceholderText(recovered)) return recovered; } const question = readRealtimeConsultQuestion(params.args); @@ -456,7 +468,7 @@ function extractHistoryMessages(result: unknown): RealtimeHistoryMessage[] { .map((item) => { const message = asRecord(item); const role = firstString(message?.role, message?.type, message?.author)?.toLowerCase() ?? null; - const content = extractText(message?.content ?? message?.message ?? message); + const content = extractSourceReplyText(message) || extractText(message?.content ?? message?.message ?? message); const idempotencyKey = firstString(message?.idempotencyKey, message?.idempotency_key) ?? null; return { role, content, idempotencyKey }; }) @@ -467,6 +479,7 @@ function selectHistoryAssistantByRunId(messages: RealtimeHistoryMessage[], runId const recovered = messages.findLast((message) => message.role === "assistant" && message.content.trim() && + !isRealtimeConsultPlaceholderText(message.content) && Boolean( message.idempotencyKey === runId || message.idempotencyKey?.startsWith(`${runId}:`) @@ -488,13 +501,31 @@ function buildRealtimeConsultChatMessage(args: unknown) { } function isRealtimeConsultPlaceholderText(text: string) { + if (isAssistantDeliveryPlaceholder(text)) return true; const normalized = normalizeHistoryText(text).replace(/[.!?]+$/g, ""); + const ascii = normalized.replace(/[’‘]/g, "'"); + const mentionsMissingResult = ascii.includes("the result didn't include") || + ascii.includes("the result did not include") || + ascii.includes("result didn't include") || + ascii.includes("result did not include") || + ascii.includes("just a completion status") || + ascii.includes("only a completion status") || + ascii.includes("completion status"); + const asksForInputAgain = ascii.includes("paste the readme") || + ascii.includes("share the readme text") || + ascii.includes("run the check again") || + ascii.includes("without the actual text") || + ascii.includes("without the actual content") || + ascii.includes("don't have the openclaw result") || + ascii.includes("do not have the openclaw result"); return normalized === "done" || normalized === "complete" || normalized === "completed" || normalized === "ok" || normalized === "okay" || - normalized === "all set"; + normalized === "all set" || + mentionsMissingResult || + asksForInputAgain; } function readRealtimeConsultQuestion(args: unknown) { @@ -527,7 +558,11 @@ function selectAssistantAfterMatchingQuestion(messages: RealtimeHistoryMessage[] const recovered = messages .slice(lastMatchingUserIndex + 1) - .find((message) => message.role === "assistant" && message.content.trim()); + .find((message) => + message.role === "assistant" && + message.content.trim() && + !isRealtimeConsultPlaceholderText(message.content) + ); return recovered?.content ?? ""; } @@ -539,6 +574,23 @@ function historyTextMatchesQuestion(content: string, normalizedQuestion: string) (normalizedContent.length >= 8 && normalizedQuestion.includes(normalizedContent)); } +function isRealtimeConsultTerminalResultEvent( + event: Record, + state: string | undefined, + sourceReplyText: string, +) { + if (state !== "final" && state !== "complete" && state !== "completed") return false; + + const eventName = firstString(event.event, event.type, event.kind)?.toLowerCase() ?? ""; + if (eventName.includes("tool") && !sourceReplyText) return false; + if (eventName === "chat" || eventName === "trace.artifacts") return true; + if (asRecord(event.message)) return true; + if (sourceReplyText) return true; + + const data = asRecord(event.data) ?? asRecord(event.payload); + return Array.isArray(data?.assistantTexts) || Array.isArray(event.assistantTexts); +} + function normalizeHistoryText(value: string) { return value.trim().replace(/\s+/g, " ").toLowerCase(); } @@ -581,6 +633,67 @@ function firstString(...values: unknown[]) { return undefined; } +function parseJsonRecord(value: string) { + const trimmed = value.trim(); + if (!trimmed || (!trimmed.startsWith("{") && !trimmed.startsWith("["))) return null; + try { + return JSON.parse(trimmed) as unknown; + } catch { + return null; + } +} + +function extractSourceReplyText(value: unknown, seen = new WeakSet()): string { + if (!value) return ""; + + if (typeof value === "string") { + const parsed = parseJsonRecord(value); + return parsed ? extractSourceReplyText(parsed, seen) : ""; + } + + if (Array.isArray(value)) { + for (const item of value) { + const text = extractSourceReplyText(item, seen); + if (text) return text; + } + return ""; + } + + const record = asRecord(value); + if (!record) return ""; + if (seen.has(record)) return ""; + seen.add(record); + + const sourceReply = asRecord(record.sourceReply) ?? asRecord(record.source_reply); + const direct = sourceReply + ? firstString(sourceReply.text, sourceReply.message, sourceReply.content) || extractText(sourceReply) + : firstString(record.sourceReplyText, record.source_reply_text); + if (direct && !isRealtimeConsultPlaceholderText(direct)) return direct; + + for (const key of [ + "content", + "text", + "output", + "result", + "response", + "data", + "payload", + "message", + "details", + "metadata", + "__openclaw", + "toolResult", + "tool_result", + "arguments", + "args", + ]) { + const text = extractSourceReplyText(record[key], seen); + if (text) return text; + } + + return ""; +} + function extractEventRunIds(payload: Record) { return [ payload.runId,