Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => {
expect(client.realtimeRelayToolResult).toHaveBeenNthCalledWith(2, {
relaySessionId: "relay_1",
callId: "call_1",
result: { text: "The repo is a CrewCMD app." },
result: { result: "The repo is a CrewCMD app." },
});
expect(mockHoldClient).toHaveBeenCalledWith(client);
expect(mockReleaseClient).toHaveBeenCalledWith(client);
Expand Down Expand Up @@ -395,7 +395,7 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => {
expect(client.realtimeRelayToolResult).toHaveBeenNthCalledWith(2, {
relaySessionId: "relay_1",
callId: "call_1",
result: { text: "The README describes the ClutchCut content engine." },
result: { result: "The README describes the ClutchCut content engine." },
});
expect(mockReleaseClient).toHaveBeenCalledWith(client);
});
Expand Down Expand Up @@ -464,7 +464,75 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => {
expect(client.realtimeRelayToolResult).toHaveBeenNthCalledWith(2, {
relaySessionId: "relay_1",
callId: "call_1",
result: { text: "Recovered from the durable transcript." },
result: { result: "Recovered from the durable transcript." },
});
});

it("recovers generic done realtime consult finals from chat history", async () => {
let gatewayHandler: ((payload: unknown) => void) | null = null;
const client = {
realtimeClientToolCall: vi.fn().mockResolvedValue({ ok: true, runId: "run_1" }),
realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }),
chatHistory: vi.fn().mockResolvedValue({
messages: [
{
role: "user",
content: [{ type: "text", text: "Can you summarize the README?" }],
},
{
role: "assistant",
idempotencyKey: "run_1",
content: [{ type: "text", text: "The README describes product-videogen as a video generation tool." }],
},
],
}),
on: vi.fn((event: string, handler: (payload: unknown) => void) => {
if (event === "*") gatewayHandler = handler;
}),
off: vi.fn(),
};
mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" });
mockGetGatewayClientForRuntime.mockResolvedValue(client);

const responsePromise = POST(
new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", {
method: "POST",
body: JSON.stringify({
action: "toolCall",
relaySessionId: "relay_1",
sessionKey: "main",
callId: "call_1",
name: "openclaw_agent_consult",
args: { question: "Can you summarize the README?" },
}),
}),
{ params: Promise.resolve({ id: "rt_1" }) },
);

await vi.waitFor(() => expect(gatewayHandler).toBeTypeOf("function"));

(gatewayHandler as ((payload: unknown) => void) | null)?.({
event: "chat",
runId: "run_1",
state: "final",
message: { role: "assistant", content: [{ type: "text", text: "Done." }] },
});

const response = await responsePromise;
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual({
result: {
delegated: true,
runId: "run_1",
finalText: "The README describes product-videogen as a video generation tool.",
result: { ok: true },
},
});
expect(client.chatHistory).toHaveBeenCalledWith({ sessionKey: "main", limit: 25 });
expect(client.realtimeRelayToolResult).toHaveBeenNthCalledWith(2, {
relaySessionId: "relay_1",
callId: "call_1",
result: { result: "The README describes product-videogen as a video generation tool." },
});
});

Expand Down
34 changes: 26 additions & 8 deletions src/app/api/runtimes/[id]/talk/realtime/relay/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async function runRealtimeToolCall(
const result = await client.realtimeRelayToolResult({
relaySessionId: params.relaySessionId,
callId: params.callId,
result: { text },
result: { result: text },
});
void audit.publish("run_completed");
return { delegated: true, runId, result, finalText: text };
Expand Down Expand Up @@ -235,7 +235,8 @@ function waitForChatFinal(params: {
let settled = false;
let historyPollInFlight = false;
let historyPollTimeout: ReturnType<typeof setTimeout> | null = null;
let emptyFinalDeadlineMs: number | null = null;
let deferredFinalText: string | null = null;
let deferredFinalDeadlineMs: number | null = null;

const timer = setTimeout(() => {
void recoverFromHistory("timeout").then((recovered) => {
Expand All @@ -259,9 +260,9 @@ function waitForChatFinal(params: {
resolve(text);
};

const maybeResolveEmptyFinal = () => {
if (settled || emptyFinalDeadlineMs === null || Date.now() < emptyFinalDeadlineMs) return false;
resolveWithText("OpenClaw completed without returning text.");
const maybeResolveDeferredFinal = () => {
if (settled || deferredFinalDeadlineMs === null || Date.now() < deferredFinalDeadlineMs) return false;
resolveWithText(deferredFinalText ?? "OpenClaw completed without returning text.");
return true;
};

Expand All @@ -287,13 +288,13 @@ function waitForChatFinal(params: {
resolveWithText(recovered);
return recovered;
}
if (!maybeResolveEmptyFinal()) scheduleHistoryPoll();
if (!maybeResolveDeferredFinal()) scheduleHistoryPoll();
return "";
} catch (error) {
if (reason !== "poll") {
console.error(`[api/realtime/relay] Failed to recover realtime consult text from history (${reason}):`, error);
}
if (!maybeResolveEmptyFinal()) scheduleHistoryPoll();
if (!maybeResolveDeferredFinal()) scheduleHistoryPoll();
return "";
} finally {
historyPollInFlight = false;
Expand All @@ -313,10 +314,17 @@ function waitForChatFinal(params: {
if (state === "final" || state === "complete" || state === "completed") {
const text = extractText(event.message) || extractText(event);
if (text) {
if (isRealtimeConsultPlaceholderText(text)) {
deferredFinalText = text;
deferredFinalDeadlineMs ??= Date.now() + REALTIME_EMPTY_FINAL_HISTORY_GRACE_MS;
void recoverFromHistory("placeholder-final");
return;
}
resolveWithText(text);
return;
}
emptyFinalDeadlineMs ??= Date.now() + REALTIME_EMPTY_FINAL_HISTORY_GRACE_MS;
deferredFinalText = "OpenClaw completed without returning text.";
deferredFinalDeadlineMs ??= Date.now() + REALTIME_EMPTY_FINAL_HISTORY_GRACE_MS;
void recoverFromHistory("empty-final");
return;
}
Expand Down Expand Up @@ -479,6 +487,16 @@ function buildRealtimeConsultChatMessage(args: unknown) {
].filter(Boolean).join("\n\n");
}

function isRealtimeConsultPlaceholderText(text: string) {
const normalized = normalizeHistoryText(text).replace(/[.!?]+$/g, "");
return normalized === "done" ||
normalized === "complete" ||
normalized === "completed" ||
normalized === "ok" ||
normalized === "okay" ||
normalized === "all set";
}

function readRealtimeConsultQuestion(args: unknown) {
const record = readRealtimeConsultArgs(args);
return firstString(record?.question, record?.prompt, record?.query, record?.task);
Expand Down
12 changes: 6 additions & 6 deletions src/lib/realtime-voice-gateway-relay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe("realtime gateway relay barge-in detection", () => {
}
});

it("ignores mobile speaker echo during the output grace window", () => {
it("does not trigger mobile barge-in during the output grace window", () => {
const result = detectRealtimeBargeIn({
input: inputWithLevel(0.3),
activeOutput: true,
Expand All @@ -46,7 +46,7 @@ describe("realtime gateway relay barge-in detection", () => {
profile: MOBILE_REALTIME_BARGE_IN_PROFILE,
});

expect(result).toEqual({ triggered: false, speechFrames: 0, suppressInput: true });
expect(result).toEqual({ triggered: false, speechFrames: 1, suppressInput: false });
});

it("requires sustained stronger speech before mobile barge-in", () => {
Expand Down Expand Up @@ -79,7 +79,7 @@ describe("realtime gateway relay barge-in detection", () => {
expect(result.suppressInput).toBe(false);
});

it("does not interrupt mobile output for short speech bursts", () => {
it("keeps likely mobile speech audible while waiting to confirm barge-in", () => {
let speechFrames = 0;
for (let i = 0; i < MOBILE_REALTIME_BARGE_IN_PROFILE.frames - 2; i += 1) {
const result = detectRealtimeBargeIn({
Expand All @@ -93,13 +93,13 @@ describe("realtime gateway relay barge-in detection", () => {
});
speechFrames = result.speechFrames;
expect(result.triggered).toBe(false);
expect(result.suppressInput).toBe(true);
expect(result.suppressInput).toBe(false);
}
});

it("suppresses mobile playback echo until barge-in is confirmed", () => {
it("suppresses mobile playback echo until likely speech starts", () => {
const echo = detectRealtimeBargeIn({
input: inputWithLevel(0.08),
input: inputWithLevel(0.04),
activeOutput: true,
cancelRequested: false,
speechFrames: 0,
Expand Down
15 changes: 7 additions & 8 deletions src/lib/realtime-voice-gateway-relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ export type RealtimeVoiceStatus = "idle" | "listening" | "processing" | "speakin
const BARGE_IN_RMS_THRESHOLD = 0.03;
const BARGE_IN_PEAK_THRESHOLD = 0.1;
const BARGE_IN_FRAMES = 3;
const MOBILE_BARGE_IN_RMS_THRESHOLD = 0.075;
const MOBILE_BARGE_IN_PEAK_THRESHOLD = 0.22;
const MOBILE_BARGE_IN_FRAMES = 7;
const MOBILE_BARGE_IN_GRACE_MS = 1200;
const MOBILE_BARGE_IN_RMS_THRESHOLD = 0.055;
const MOBILE_BARGE_IN_PEAK_THRESHOLD = 0.16;
const MOBILE_BARGE_IN_FRAMES = 3;
const MOBILE_BARGE_IN_GRACE_MS = 450;
const REALTIME_VOICE_CONTEXT_LIMIT = 8;

export interface RealtimeBargeInProfile {
Expand Down Expand Up @@ -451,8 +451,7 @@ export function detectRealtimeBargeIn(input: RealtimeBargeInDetectionInput) {
if (
!input.activeOutput ||
input.cancelRequested ||
input.input.length === 0 ||
isWithinGraceWindow(input)
input.input.length === 0
) {
return {
triggered: false,
Expand All @@ -475,9 +474,9 @@ export function detectRealtimeBargeIn(input: RealtimeBargeInDetectionInput) {
: 0;

return {
triggered: speechFrames >= input.profile.frames,
triggered: !isWithinGraceWindow(input) && speechFrames >= input.profile.frames,
speechFrames,
suppressInput: input.profile.suppressEchoInput && input.activeOutput && speechFrames < input.profile.frames,
suppressInput: input.profile.suppressEchoInput && input.activeOutput && speechFrames === 0,
};
}

Expand Down
Loading