Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .pr/issue-1352-before-after.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .pr/issue-1352-fixed.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .pr/issue-1352-main-broken.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
51 changes: 49 additions & 2 deletions __tests__/contexts/conversation-websocket-context.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import { useBrowserStore } from "#/stores/browser-store";
import { useUserConversation } from "#/hooks/query/use-user-conversation";
import EventService from "#/api/event-service/event-service.api";
import { getStoredConversationMetadata } from "#/api/conversation-metadata-store";
import type { MessageEvent } from "#/types/agent-server/core";
import { ExecutionStatus, type MessageEvent } from "#/types/agent-server/core";

// Captures the main socket's `onMessage` (`handleMainMessage`) so tests can
// drive the live message path without a real WebSocket. Only the main socket
// gets a non-empty url (planning stays ""), so url presence discriminates it.
const wsCapture = vi.hoisted(() => ({
mainOnMessage: null as null | ((event: { data: string }) => void),
mainQueryParams: null as null | Record<string, string | boolean>,
}));

// Keep the units under test real (the provider, `useConversationHistory`, the
Expand All @@ -25,10 +26,14 @@ vi.mock("#/hooks/use-websocket", () => ({
useWebSocket: vi.fn(
(
url: string,
options?: { onMessage?: (event: { data: string }) => void },
options?: {
onMessage?: (event: { data: string }) => void;
queryParams?: Record<string, string | boolean>;
},
) => {
if (url && options?.onMessage) {
wsCapture.mainOnMessage = options.onMessage;
wsCapture.mainQueryParams = options.queryParams ?? null;
}
return { socket: null, reconnect: vi.fn() };
},
Expand Down Expand Up @@ -71,6 +76,7 @@ describe("ConversationWebSocketProvider — conversation-scoped event store", ()

beforeEach(() => {
wsCapture.mainOnMessage = null;
wsCapture.mainQueryParams = null;
window.localStorage.clear();
queryClient = new QueryClient({
defaultOptions: { queries: { retry: false } },
Expand Down Expand Up @@ -153,6 +159,47 @@ describe("ConversationWebSocketProvider — conversation-scoped event store", ()
);
});

it("replays ACP sockets from the latest user message to backfill streaming deltas", async () => {
const user = {
...createUserMessageEvent("user-anchor"),
timestamp: "2024-03-01T00:00:00Z",
};
const agent = {
...createUserMessageEvent("agent-after-user"),
timestamp: "2024-03-01T00:00:05Z",
source: "agent",
llm_message: {
role: "assistant",
content: [{ type: "text", text: "Persisted final text" }],
},
} as MessageEvent;

vi.mocked(EventService.searchEvents).mockResolvedValueOnce({
items: [agent, user],
next_page_id: null,
});

render(
<QueryClientProvider client={queryClient}>
<ConversationWebSocketProvider
conversationId="conv-acp"
conversationUrl="http://localhost/api"
agentKind="acp"
executionStatus={ExecutionStatus.RUNNING}
>
<div />
</ConversationWebSocketProvider>
</QueryClientProvider>,
);

await waitFor(() =>
expect(wsCapture.mainQueryParams).toMatchObject({
resend_mode: "since",
after_timestamp: "2024-03-01T00:00:00Z",
}),
);
});

it("clears the previous conversation's events when switching conversations", async () => {
// Arrange + Act: open conversation A.
const { rerender } = renderProvider("conv-a");
Expand Down
105 changes: 105 additions & 0 deletions __tests__/stores/use-event-store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
SecurityRisk,
} from "#/types/agent-server/core";
import { StreamingDeltaEvent } from "#/types/agent-server/core/events/streaming-delta-event";
import { ACPToolCallEvent } from "#/types/agent-server/core/events/acp-tool-call-event";

const mockUserMessageEvent: MessageEvent = {
id: "test-event-1",
Expand Down Expand Up @@ -93,6 +94,41 @@ const makeUserMessageEvent = (id: string, timestamp: string): MessageEvent => ({
timestamp,
});

const makeAgentMessageEvent = (
id: string,
timestamp: string,
text: string,
): MessageEvent => ({
id,
timestamp,
source: "agent",
llm_message: {
role: "assistant",
content: [{ type: "text", text }],
},
activated_microagents: [],
extended_content: [],
});

const makeACPToolCallEvent = (
id: string,
timestamp: string,
status: ACPToolCallEvent["status"],
): ACPToolCallEvent => ({
kind: "ACPToolCallEvent",
id,
timestamp,
source: "agent",
tool_call_id: "acp-tool-1",
title: "python train.py",
tool_kind: "execute",
status,
raw_input: { command: "python train.py" },
raw_output: status === "completed" ? "done" : null,
content: null,
is_error: false,
});

describe("useEventStore", () => {
it("should render initial state correctly", () => {
const { result } = renderHook(() => useEventStore());
Expand Down Expand Up @@ -217,6 +253,75 @@ describe("useEventStore", () => {
]);
});

it("keeps ACP terminal tool calls at their started position after history normalization", () => {
const { result } = renderHook(() => useEventStore());
const user = makeUserMessageEvent("user-1", "2024-03-01T00:00:00Z");
const firstDelta = makeStreamingDeltaEvent("delta-1", "The rerun failed.");
const toolStarted = makeACPToolCallEvent(
"acp-started",
"2024-03-01T00:00:02Z",
"in_progress",
);
const secondDelta = makeStreamingDeltaEvent("delta-3", "I am checking it.");
const toolCompleted = makeACPToolCallEvent(
"acp-completed",
"2024-03-01T00:00:04Z",
"completed",
);

act(() => {
result.current.addEvents([
user,
firstDelta,
toolStarted,
secondDelta,
toolCompleted,
]);
});

expect(result.current.uiEvents.map((event) => event.id)).toEqual([
"user-1",
"delta-1",
"acp-completed",
"delta-3",
]);
});

it("rebuilds UI events when replayed ACP deltas arrive before an existing final message", () => {
const { result } = renderHook(() => useEventStore());
const user = makeUserMessageEvent("user-1", "2024-03-01T00:00:00Z");
const tool = makeACPToolCallEvent(
"acp-completed",
"2024-03-01T00:00:03Z",
"completed",
);
const finalMessage = makeAgentMessageEvent(
"agent-final",
"2024-03-01T00:00:04Z",
"The rerun failed. I am checking it.",
);
const replayedDelta = makeStreamingDeltaEvent(
"delta-1",
"The rerun failed.",
);
replayedDelta.timestamp = "2024-03-01T00:00:01Z";

act(() => {
result.current.addEvents([user, tool, finalMessage]);
result.current.addEvent(replayedDelta);
});

expect(result.current.uiEvents.map((event) => event.id)).toEqual([
"user-1",
"delta-1",
"acp-completed",
]);
expect(result.current.uiEvents[1]).toMatchObject({
id: "delta-1",
content: "The rerun failed. I am checking it.",
});
});

it("should clear all events when clearEvents is called", () => {
const { result } = renderHook(() => useEventStore());

Expand Down
50 changes: 39 additions & 11 deletions src/contexts/conversation-websocket-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import type {
ConversationErrorEvent,
ServerErrorEvent,
} from "#/types/agent-server/core/events/conversation-state-event";
import { ExecutionStatus } from "#/types/agent-server/core";
import { handleActionEventCacheInvalidation } from "#/utils/cache-utils";
import { buildWebSocketUrl } from "#/utils/websocket-url";
import type {
Expand Down Expand Up @@ -115,13 +116,17 @@ export function ConversationWebSocketProvider({
conversationId,
conversationUrl,
sessionApiKey,
agentKind,
executionStatus,
subConversations,
subConversationIds,
}: {
children: React.ReactNode;
conversationId?: string;
conversationUrl?: string | null;
sessionApiKey?: string | null;
agentKind?: "openhands" | "acp" | null;
executionStatus?: ExecutionStatus | null;
subConversations?: AppConversation[];
subConversationIds?: string[];
}) {
Expand Down Expand Up @@ -292,24 +297,46 @@ export function ConversationWebSocketProvider({
]);

/**
* Timestamp of the latest event we already have from REST. Used as
* `after_timestamp` when opening the WebSocket so the server only resends
* events strictly after this point. `null` until the REST query settles
* (we hold the WS connection open until then to avoid an `all` resend).
* Timestamp used for the main WebSocket's `resend_mode='since'` replay.
* REST history can omit ACP streaming deltas while a turn is active, so ACP
* sockets replay from the latest user message instead of the latest REST
* event. The event store de-dupes persisted tool events and backfills the
* streaming assistant messages at their original positions.
*/
const initialAfterTimestamp = useMemo<string | null>(() => {
if (isPreloadingHistory) return null;
const events = preloadedHistory?.events ?? [];

if (agentKind === "acp") {
const latestUserMessage = [...events]
.reverse()
.find((event) => isUserMessageEvent(event));
if (
latestUserMessage &&
"timestamp" in latestUserMessage &&
latestUserMessage.timestamp
) {
return latestUserMessage.timestamp;
}

const shouldReplayAllForActiveAcp =
executionStatus === ExecutionStatus.RUNNING ||
executionStatus === ExecutionStatus.PAUSED ||
executionStatus === ExecutionStatus.WAITING_FOR_CONFIRMATION;
if (shouldReplayAllForActiveAcp) return null;
}

const latest = events[events.length - 1];
if (!latest || !("timestamp" in latest) || !latest.timestamp) return null;
return latest.timestamp;
}, [preloadedHistory, isPreloadingHistory]);
}, [agentKind, executionStatus, preloadedHistory, isPreloadingHistory]);

// Build WebSocket URL from props.
//
// We deliberately wait for the initial REST history fetch to settle before
// opening the socket so the WS subscription can use `resend_mode='since'`
// with a meaningful `after_timestamp`. Without this gate, the WS would open
// with a meaningful `after_timestamp` (latest REST event normally, latest
// user message for ACP backfill). Without this gate, the WS would open
// immediately and either replay the entire conversation (when falling back
// to `resend_mode='all'`) or miss events that arrived between REST and WS.
const wsUrl = useMemo(() => {
Expand Down Expand Up @@ -821,11 +848,12 @@ export function ConversationWebSocketProvider({
// Separate WebSocket options for main connection
const mainWebsocketOptions: WebSocketHookOptions = useMemo(() => {
// History was already loaded over REST (`useConversationHistory`).
// Subscribe with `resend_mode='since'` so the server only resends events
// strictly after the latest one we already have. If REST returned no
// events at all (brand-new conversation), fall back to `'all'` so any
// events that may have been written between the REST call and the WS
// handshake still show up. Dedup in the event store handles overlap.
// Subscribe with `resend_mode='since'` so the server only resends from the
// selected replay anchor. For ACP conversations that anchor is the latest
// user message, because ACP streaming deltas can be missing from REST while
// still replayable over the socket. If no anchor is available, fall back to
// `'all'` so events written between REST and the WS handshake still show up.
// Dedup in the event store handles overlap.
const queryParams: Record<string, string | boolean> = initialAfterTimestamp
? { resend_mode: "since", after_timestamp: initialAfterTimestamp }
: { resend_mode: "all" };
Expand Down
2 changes: 2 additions & 0 deletions src/contexts/websocket-provider-wrapper.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export function WebSocketProviderWrapper({
conversationId={conversationId}
conversationUrl={conversationUrl}
sessionApiKey={conversation?.session_api_key}
agentKind={conversation?.agent_kind}
executionStatus={conversation?.execution_status}
subConversationIds={conversation?.sub_conversation_ids}
subConversations={filteredSubConversations}
>
Expand Down
46 changes: 39 additions & 7 deletions src/stores/use-event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,43 @@ const appendEvent = (state: EventState, event: OHEvent): EventState => {
};
};

const sortEventState = (state: EventState): EventState => ({
...state,
events: [...state.events].sort(compareEventsByTimestamp),
uiEvents: [...state.uiEvents].sort(compareEventsByTimestamp),
});
const compactConsecutiveStreamingDeltas = (events: OHEvent[]): OHEvent[] => {
const compacted: OHEvent[] = [];

for (const event of events) {
const lastIndex = compacted.length - 1;
const lastEvent = compacted[lastIndex];
if (
lastEvent &&
isStreamingDeltaEvent(event) &&
isStreamingDeltaEvent(lastEvent)
) {
compacted[lastIndex] = mergeStreamingDeltaEvent(event, lastEvent);
} else {
compacted.push(event);
}
}

return compacted;
};

const buildUiEvents = (events: OHEvent[]): OHEvent[] =>
events.reduce<OHEvent[]>(
(uiEvents, event) => handleEventForUI(event, uiEvents),
[],
);

const normalizeEventState = (state: EventState): EventState => {
const events = compactConsecutiveStreamingDeltas(
[...state.events].sort(compareEventsByTimestamp),
);

return {
...state,
events,
uiEvents: buildUiEvents(events),
};
};

const applyAddEvent = (state: EventState, event: OHEvent): EventState => {
const next = appendEvent(state, event);
Expand All @@ -140,7 +172,7 @@ const applyAddEvent = (state: EventState, event: OHEvent): EventState => {
return next;
}

return sortEventState(next);
return normalizeEventState(next);
};

export const useEventStore = create<EventState>()((set) => ({
Expand Down Expand Up @@ -188,7 +220,7 @@ export const useEventStore = create<EventState>()((set) => ({
return state;
}

return sortEventState({
return normalizeEventState({
...state,
events,
eventIds,
Expand Down
Loading