Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => {

const response = await responsePromise;
expect(response.status).toBe(200);
const body = await response.json();
expect(body.result.finalText).toBe("The repo is a CrewCMD app.");
const persistedEvents = mockPersistChatProgressEvent.mock.calls.map((call) => call[0].payload.event);
expect(persistedEvents).toContain("run_started");
expect(persistedEvents).toContain("tool_started");
Expand All @@ -334,6 +336,68 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => {
}));
});

it("prefers OpenClaw source replies over completion-only final text", async () => {
let gatewayHandler: ((payload: unknown) => void) | null = null;
const client = {
realtimeClientToolCall: vi.fn().mockResolvedValue({ runId: "run_1" }),
realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }),
on: vi.fn((event: string, handler: (payload: unknown) => void) => {
if (event === "*") gatewayHandler = handler;
}),
off: vi.fn(),
};
mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" });
mockGetGatewayClientForRuntime.mockResolvedValue(client);

const responsePromise = POST(
new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", {
method: "POST",
body: JSON.stringify({
action: "toolCall",
relaySessionId: "relay_1",
sessionKey: "main",
callId: "call_1",
name: "openclaw_agent_consult",
args: { question: "Summarize the README" },
}),
}),
{ params: Promise.resolve({ id: "rt_1" }) },
);

await vi.waitFor(() => expect(gatewayHandler).toBeTypeOf("function"));

(gatewayHandler as ((payload: unknown) => void) | null)?.({
event: "chat",
runId: "run_1",
state: "final",
message: {
role: "assistant",
content: [{ type: "text", text: "Done." }],
details: {
sourceReply: {
text: "The README says product-videogen creates product videos from prompts and assets.",
},
},
},
});

const response = await responsePromise;
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual({
result: {
delegated: true,
runId: "run_1",
finalText: "The README says product-videogen creates product videos from prompts and assets.",
result: { ok: true },
},
});
expect(client.realtimeRelayToolResult).toHaveBeenNthCalledWith(2, {
relaySessionId: "relay_1",
callId: "call_1",
result: { result: "The README says product-videogen creates product videos from prompts and assets." },
});
});

it("extracts final realtime consult text from OpenClaw trace artifacts", async () => {
let gatewayHandler: ((payload: unknown) => void) | null = null;
const client = {
Expand Down Expand Up @@ -536,6 +600,81 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => {
});
});

it("skips completion-status chat history entries when recovering realtime consult text", async () => {
let gatewayHandler: ((payload: unknown) => void) | null = null;
const client = {
realtimeClientToolCall: vi.fn().mockResolvedValue({ ok: true, runId: "run_1" }),
realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }),
chatHistory: vi.fn().mockResolvedValue({
messages: [
{
role: "user",
content: [{ type: "text", text: "Can you summarize the README?" }],
},
{
role: "assistant",
content: [{
type: "text",
text:
"I checked, but the result didn’t include the README contents, just a completion status.",
}],
},
{
role: "assistant",
content: [{
type: "text",
text:
"The README describes product-videogen as a product video generation workflow with prompt, asset, and rendering steps.",
}],
},
],
}),
on: vi.fn((event: string, handler: (payload: unknown) => void) => {
if (event === "*") gatewayHandler = handler;
}),
off: vi.fn(),
};
mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" });
mockGetGatewayClientForRuntime.mockResolvedValue(client);

const responsePromise = POST(
new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", {
method: "POST",
body: JSON.stringify({
action: "toolCall",
relaySessionId: "relay_1",
sessionKey: "main",
callId: "call_1",
name: "openclaw_agent_consult",
args: { question: "Can you summarize the README?" },
}),
}),
{ params: Promise.resolve({ id: "rt_1" }) },
);

await vi.waitFor(() => expect(gatewayHandler).toBeTypeOf("function"));

(gatewayHandler as ((payload: unknown) => void) | null)?.({
event: "chat",
runId: "run_1",
state: "final",
message: { role: "assistant", content: [{ type: "text", text: "Done." }] },
});

const response = await responsePromise;
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual({
result: {
delegated: true,
runId: "run_1",
finalText:
"The README describes product-videogen as a product video generation workflow with prompt, asset, and rendering steps.",
result: { ok: true },
},
});
expect(client.chatHistory).toHaveBeenCalledWith({ sessionKey: "main", limit: 25 });
});

it("rejects invalid relay actions before calling the gateway", async () => {
mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" });

Expand Down
127 changes: 120 additions & 7 deletions src/app/api/runtimes/[id]/talk/realtime/relay/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { buildRuntimeReadWhere, getAgentAccessContext } from "@/lib/agent-access
import { canAccessChatSession } from "@/lib/chat-session-access";
import { publishChatProgressEvent } from "@/lib/chat-pubsub";
import { persistChatProgressEvent } from "@/lib/chat-session-events";
import { selectRecoveredAssistantText } from "@/lib/chat-recovery";
import { isAssistantDeliveryPlaceholder, selectRecoveredAssistantText } from "@/lib/chat-recovery";
import { getGatewayClientForRuntime, holdClient, releaseClient } from "@/lib/gateway-chat-pool";
import type { GatewayClient } from "@/lib/gateway-client";

Expand Down Expand Up @@ -237,6 +237,7 @@ function waitForChatFinal(params: {
let historyPollTimeout: ReturnType<typeof setTimeout> | null = null;
let deferredFinalText: string | null = null;
let deferredFinalDeadlineMs: number | null = null;
let latestSourceReplyText: string | null = null;

const timer = setTimeout(() => {
void recoverFromHistory("timeout").then((recovered) => {
Expand Down Expand Up @@ -288,6 +289,10 @@ function waitForChatFinal(params: {
resolveWithText(recovered);
return recovered;
}
if (latestSourceReplyText && !settled) {
resolveWithText(latestSourceReplyText);
return latestSourceReplyText;
}
if (!maybeResolveDeferredFinal()) scheduleHistoryPoll();
return "";
} catch (error) {
Expand All @@ -307,14 +312,21 @@ function waitForChatFinal(params: {
const runIds = extractEventRunIds(event);
if (!runIds.includes(runId)) return;

const eventSourceReplyText = extractSourceReplyText(event);
if (eventSourceReplyText) latestSourceReplyText = eventSourceReplyText;

const toolProgress = extractToolProgress(event);
if (toolProgress) void audit.publish(toolProgress.event, { activeTool: toolProgress.activeTool });

const state = firstString(event.state, event.status)?.toLowerCase();
if (state === "final" || state === "complete" || state === "completed") {
const text = extractText(event.message) || extractText(event);
if (isRealtimeConsultTerminalResultEvent(event, state, eventSourceReplyText)) {
const text = extractSourceReplyText(event.message) || eventSourceReplyText || extractText(event.message) || extractText(event);
if (text) {
if (isRealtimeConsultPlaceholderText(text)) {
if (latestSourceReplyText) {
resolveWithText(latestSourceReplyText);
return;
}
deferredFinalText = text;
deferredFinalDeadlineMs ??= Date.now() + REALTIME_EMPTY_FINAL_HISTORY_GRACE_MS;
void recoverFromHistory("placeholder-final");
Expand Down Expand Up @@ -429,7 +441,7 @@ async function recoverRealtimeConsultTextFromHistory(params: {
messages,
currentUserContents: [consultMessage],
});
if (recovered) return recovered;
if (recovered && !isRealtimeConsultPlaceholderText(recovered)) return recovered;
}

const question = readRealtimeConsultQuestion(params.args);
Expand All @@ -456,7 +468,7 @@ function extractHistoryMessages(result: unknown): RealtimeHistoryMessage[] {
.map((item) => {
const message = asRecord(item);
const role = firstString(message?.role, message?.type, message?.author)?.toLowerCase() ?? null;
const content = extractText(message?.content ?? message?.message ?? message);
const content = extractSourceReplyText(message) || extractText(message?.content ?? message?.message ?? message);
const idempotencyKey = firstString(message?.idempotencyKey, message?.idempotency_key) ?? null;
return { role, content, idempotencyKey };
})
Expand All @@ -467,6 +479,7 @@ function selectHistoryAssistantByRunId(messages: RealtimeHistoryMessage[], runId
const recovered = messages.findLast((message) =>
message.role === "assistant" &&
message.content.trim() &&
!isRealtimeConsultPlaceholderText(message.content) &&
Boolean(
message.idempotencyKey === runId ||
message.idempotencyKey?.startsWith(`${runId}:`)
Expand All @@ -488,13 +501,31 @@ function buildRealtimeConsultChatMessage(args: unknown) {
}

function isRealtimeConsultPlaceholderText(text: string) {
if (isAssistantDeliveryPlaceholder(text)) return true;
const normalized = normalizeHistoryText(text).replace(/[.!?]+$/g, "");
const ascii = normalized.replace(/[’‘]/g, "'");
const mentionsMissingResult = ascii.includes("the result didn't include") ||
ascii.includes("the result did not include") ||
ascii.includes("result didn't include") ||
ascii.includes("result did not include") ||
ascii.includes("just a completion status") ||
ascii.includes("only a completion status") ||
ascii.includes("completion status");
const asksForInputAgain = ascii.includes("paste the readme") ||
ascii.includes("share the readme text") ||
ascii.includes("run the check again") ||
ascii.includes("without the actual text") ||
ascii.includes("without the actual content") ||
ascii.includes("don't have the openclaw result") ||
ascii.includes("do not have the openclaw result");
return normalized === "done" ||
normalized === "complete" ||
normalized === "completed" ||
normalized === "ok" ||
normalized === "okay" ||
normalized === "all set";
normalized === "all set" ||
mentionsMissingResult ||
asksForInputAgain;
}

function readRealtimeConsultQuestion(args: unknown) {
Expand Down Expand Up @@ -527,7 +558,11 @@ function selectAssistantAfterMatchingQuestion(messages: RealtimeHistoryMessage[]

const recovered = messages
.slice(lastMatchingUserIndex + 1)
.find((message) => message.role === "assistant" && message.content.trim());
.find((message) =>
message.role === "assistant" &&
message.content.trim() &&
!isRealtimeConsultPlaceholderText(message.content)
);
return recovered?.content ?? "";
}

Expand All @@ -539,6 +574,23 @@ function historyTextMatchesQuestion(content: string, normalizedQuestion: string)
(normalizedContent.length >= 8 && normalizedQuestion.includes(normalizedContent));
}

function isRealtimeConsultTerminalResultEvent(
event: Record<string, unknown>,
state: string | undefined,
sourceReplyText: string,
) {
if (state !== "final" && state !== "complete" && state !== "completed") return false;

const eventName = firstString(event.event, event.type, event.kind)?.toLowerCase() ?? "";
if (eventName.includes("tool") && !sourceReplyText) return false;
if (eventName === "chat" || eventName === "trace.artifacts") return true;
if (asRecord(event.message)) return true;
if (sourceReplyText) return true;

const data = asRecord(event.data) ?? asRecord(event.payload);
return Array.isArray(data?.assistantTexts) || Array.isArray(event.assistantTexts);
}

function normalizeHistoryText(value: string) {
return value.trim().replace(/\s+/g, " ").toLowerCase();
}
Expand Down Expand Up @@ -581,6 +633,67 @@ function firstString(...values: unknown[]) {
return undefined;
}

function parseJsonRecord(value: string) {
const trimmed = value.trim();
if (!trimmed || (!trimmed.startsWith("{") && !trimmed.startsWith("["))) return null;
try {
return JSON.parse(trimmed) as unknown;
} catch {
return null;
}
}

function extractSourceReplyText(value: unknown, seen = new WeakSet<object>()): string {
if (!value) return "";

if (typeof value === "string") {
const parsed = parseJsonRecord(value);
return parsed ? extractSourceReplyText(parsed, seen) : "";
}

if (Array.isArray(value)) {
for (const item of value) {
const text = extractSourceReplyText(item, seen);
if (text) return text;
}
return "";
}

const record = asRecord(value);
if (!record) return "";
if (seen.has(record)) return "";
seen.add(record);

const sourceReply = asRecord(record.sourceReply) ?? asRecord(record.source_reply);
const direct = sourceReply
? firstString(sourceReply.text, sourceReply.message, sourceReply.content) || extractText(sourceReply)
: firstString(record.sourceReplyText, record.source_reply_text);
if (direct && !isRealtimeConsultPlaceholderText(direct)) return direct;

for (const key of [
"content",
"text",
"output",
"result",
"response",
"data",
"payload",
"message",
"details",
"metadata",
"__openclaw",
"toolResult",
"tool_result",
"arguments",
"args",
]) {
const text = extractSourceReplyText(record[key], seen);
if (text) return text;
}

return "";
}

function extractEventRunIds(payload: Record<string, unknown>) {
return [
payload.runId,
Expand Down
Loading