diff --git a/.pr/issue-1352-before-after.gif b/.pr/issue-1352-before-after.gif new file mode 100644 index 000000000..8ed0312fc Binary files /dev/null and b/.pr/issue-1352-before-after.gif differ diff --git a/.pr/issue-1352-fixed.png b/.pr/issue-1352-fixed.png new file mode 100644 index 000000000..7780a9ae5 Binary files /dev/null and b/.pr/issue-1352-fixed.png differ diff --git a/.pr/issue-1352-main-broken.png b/.pr/issue-1352-main-broken.png new file mode 100644 index 000000000..44a8e7703 Binary files /dev/null and b/.pr/issue-1352-main-broken.png differ diff --git a/__tests__/contexts/conversation-websocket-context.test.tsx b/__tests__/contexts/conversation-websocket-context.test.tsx index 462212b33..1e8850ec2 100644 --- a/__tests__/contexts/conversation-websocket-context.test.tsx +++ b/__tests__/contexts/conversation-websocket-context.test.tsx @@ -9,13 +9,14 @@ import { useBrowserStore } from "#/stores/browser-store"; import { useUserConversation } from "#/hooks/query/use-user-conversation"; import EventService from "#/api/event-service/event-service.api"; import { getStoredConversationMetadata } from "#/api/conversation-metadata-store"; -import type { MessageEvent } from "#/types/agent-server/core"; +import { ExecutionStatus, type MessageEvent } from "#/types/agent-server/core"; // Captures the main socket's `onMessage` (`handleMainMessage`) so tests can // drive the live message path without a real WebSocket. Only the main socket // gets a non-empty url (planning stays ""), so url presence discriminates it. const wsCapture = vi.hoisted(() => ({ mainOnMessage: null as null | ((event: { data: string }) => void), + mainQueryParams: null as null | Record, })); // Keep the units under test real (the provider, `useConversationHistory`, the @@ -25,10 +26,14 @@ vi.mock("#/hooks/use-websocket", () => ({ useWebSocket: vi.fn( ( url: string, - options?: { onMessage?: (event: { data: string }) => void }, + options?: { + onMessage?: (event: { data: string }) => void; + queryParams?: Record; + }, ) => { if (url && options?.onMessage) { wsCapture.mainOnMessage = options.onMessage; + wsCapture.mainQueryParams = options.queryParams ?? null; } return { socket: null, reconnect: vi.fn() }; }, @@ -71,6 +76,7 @@ describe("ConversationWebSocketProvider — conversation-scoped event store", () beforeEach(() => { wsCapture.mainOnMessage = null; + wsCapture.mainQueryParams = null; window.localStorage.clear(); queryClient = new QueryClient({ defaultOptions: { queries: { retry: false } }, @@ -153,6 +159,47 @@ describe("ConversationWebSocketProvider — conversation-scoped event store", () ); }); + it("replays ACP sockets from the latest user message to backfill streaming deltas", async () => { + const user = { + ...createUserMessageEvent("user-anchor"), + timestamp: "2024-03-01T00:00:00Z", + }; + const agent = { + ...createUserMessageEvent("agent-after-user"), + timestamp: "2024-03-01T00:00:05Z", + source: "agent", + llm_message: { + role: "assistant", + content: [{ type: "text", text: "Persisted final text" }], + }, + } as MessageEvent; + + vi.mocked(EventService.searchEvents).mockResolvedValueOnce({ + items: [agent, user], + next_page_id: null, + }); + + render( + + +
+ + , + ); + + await waitFor(() => + expect(wsCapture.mainQueryParams).toMatchObject({ + resend_mode: "since", + after_timestamp: "2024-03-01T00:00:00Z", + }), + ); + }); + it("clears the previous conversation's events when switching conversations", async () => { // Arrange + Act: open conversation A. const { rerender } = renderProvider("conv-a"); diff --git a/__tests__/stores/use-event-store.test.ts b/__tests__/stores/use-event-store.test.ts index 1e8d36cb1..d5029be58 100644 --- a/__tests__/stores/use-event-store.test.ts +++ b/__tests__/stores/use-event-store.test.ts @@ -8,6 +8,7 @@ import { SecurityRisk, } from "#/types/agent-server/core"; import { StreamingDeltaEvent } from "#/types/agent-server/core/events/streaming-delta-event"; +import { ACPToolCallEvent } from "#/types/agent-server/core/events/acp-tool-call-event"; const mockUserMessageEvent: MessageEvent = { id: "test-event-1", @@ -93,6 +94,41 @@ const makeUserMessageEvent = (id: string, timestamp: string): MessageEvent => ({ timestamp, }); +const makeAgentMessageEvent = ( + id: string, + timestamp: string, + text: string, +): MessageEvent => ({ + id, + timestamp, + source: "agent", + llm_message: { + role: "assistant", + content: [{ type: "text", text }], + }, + activated_microagents: [], + extended_content: [], +}); + +const makeACPToolCallEvent = ( + id: string, + timestamp: string, + status: ACPToolCallEvent["status"], +): ACPToolCallEvent => ({ + kind: "ACPToolCallEvent", + id, + timestamp, + source: "agent", + tool_call_id: "acp-tool-1", + title: "python train.py", + tool_kind: "execute", + status, + raw_input: { command: "python train.py" }, + raw_output: status === "completed" ? "done" : null, + content: null, + is_error: false, +}); + describe("useEventStore", () => { it("should render initial state correctly", () => { const { result } = renderHook(() => useEventStore()); @@ -217,6 +253,75 @@ describe("useEventStore", () => { ]); }); + it("keeps ACP terminal tool calls at their started position after history normalization", () => { + const { result } = renderHook(() => useEventStore()); + const user = makeUserMessageEvent("user-1", "2024-03-01T00:00:00Z"); + const firstDelta = makeStreamingDeltaEvent("delta-1", "The rerun failed."); + const toolStarted = makeACPToolCallEvent( + "acp-started", + "2024-03-01T00:00:02Z", + "in_progress", + ); + const secondDelta = makeStreamingDeltaEvent("delta-3", "I am checking it."); + const toolCompleted = makeACPToolCallEvent( + "acp-completed", + "2024-03-01T00:00:04Z", + "completed", + ); + + act(() => { + result.current.addEvents([ + user, + firstDelta, + toolStarted, + secondDelta, + toolCompleted, + ]); + }); + + expect(result.current.uiEvents.map((event) => event.id)).toEqual([ + "user-1", + "delta-1", + "acp-completed", + "delta-3", + ]); + }); + + it("rebuilds UI events when replayed ACP deltas arrive before an existing final message", () => { + const { result } = renderHook(() => useEventStore()); + const user = makeUserMessageEvent("user-1", "2024-03-01T00:00:00Z"); + const tool = makeACPToolCallEvent( + "acp-completed", + "2024-03-01T00:00:03Z", + "completed", + ); + const finalMessage = makeAgentMessageEvent( + "agent-final", + "2024-03-01T00:00:04Z", + "The rerun failed. I am checking it.", + ); + const replayedDelta = makeStreamingDeltaEvent( + "delta-1", + "The rerun failed.", + ); + replayedDelta.timestamp = "2024-03-01T00:00:01Z"; + + act(() => { + result.current.addEvents([user, tool, finalMessage]); + result.current.addEvent(replayedDelta); + }); + + expect(result.current.uiEvents.map((event) => event.id)).toEqual([ + "user-1", + "delta-1", + "acp-completed", + ]); + expect(result.current.uiEvents[1]).toMatchObject({ + id: "delta-1", + content: "The rerun failed. I am checking it.", + }); + }); + it("should clear all events when clearEvents is called", () => { const { result } = renderHook(() => useEventStore()); diff --git a/src/contexts/conversation-websocket-context.tsx b/src/contexts/conversation-websocket-context.tsx index c3c8389fd..940a4a50a 100644 --- a/src/contexts/conversation-websocket-context.tsx +++ b/src/contexts/conversation-websocket-context.tsx @@ -44,6 +44,7 @@ import type { ConversationErrorEvent, ServerErrorEvent, } from "#/types/agent-server/core/events/conversation-state-event"; +import { ExecutionStatus } from "#/types/agent-server/core"; import { handleActionEventCacheInvalidation } from "#/utils/cache-utils"; import { buildWebSocketUrl } from "#/utils/websocket-url"; import type { @@ -115,6 +116,8 @@ export function ConversationWebSocketProvider({ conversationId, conversationUrl, sessionApiKey, + agentKind, + executionStatus, subConversations, subConversationIds, }: { @@ -122,6 +125,8 @@ export function ConversationWebSocketProvider({ conversationId?: string; conversationUrl?: string | null; sessionApiKey?: string | null; + agentKind?: "openhands" | "acp" | null; + executionStatus?: ExecutionStatus | null; subConversations?: AppConversation[]; subConversationIds?: string[]; }) { @@ -292,24 +297,46 @@ export function ConversationWebSocketProvider({ ]); /** - * Timestamp of the latest event we already have from REST. Used as - * `after_timestamp` when opening the WebSocket so the server only resends - * events strictly after this point. `null` until the REST query settles - * (we hold the WS connection open until then to avoid an `all` resend). + * Timestamp used for the main WebSocket's `resend_mode='since'` replay. + * REST history can omit ACP streaming deltas while a turn is active, so ACP + * sockets replay from the latest user message instead of the latest REST + * event. The event store de-dupes persisted tool events and backfills the + * streaming assistant messages at their original positions. */ const initialAfterTimestamp = useMemo(() => { if (isPreloadingHistory) return null; const events = preloadedHistory?.events ?? []; + + if (agentKind === "acp") { + const latestUserMessage = [...events] + .reverse() + .find((event) => isUserMessageEvent(event)); + if ( + latestUserMessage && + "timestamp" in latestUserMessage && + latestUserMessage.timestamp + ) { + return latestUserMessage.timestamp; + } + + const shouldReplayAllForActiveAcp = + executionStatus === ExecutionStatus.RUNNING || + executionStatus === ExecutionStatus.PAUSED || + executionStatus === ExecutionStatus.WAITING_FOR_CONFIRMATION; + if (shouldReplayAllForActiveAcp) return null; + } + const latest = events[events.length - 1]; if (!latest || !("timestamp" in latest) || !latest.timestamp) return null; return latest.timestamp; - }, [preloadedHistory, isPreloadingHistory]); + }, [agentKind, executionStatus, preloadedHistory, isPreloadingHistory]); // Build WebSocket URL from props. // // We deliberately wait for the initial REST history fetch to settle before // opening the socket so the WS subscription can use `resend_mode='since'` - // with a meaningful `after_timestamp`. Without this gate, the WS would open + // with a meaningful `after_timestamp` (latest REST event normally, latest + // user message for ACP backfill). Without this gate, the WS would open // immediately and either replay the entire conversation (when falling back // to `resend_mode='all'`) or miss events that arrived between REST and WS. const wsUrl = useMemo(() => { @@ -821,11 +848,12 @@ export function ConversationWebSocketProvider({ // Separate WebSocket options for main connection const mainWebsocketOptions: WebSocketHookOptions = useMemo(() => { // History was already loaded over REST (`useConversationHistory`). - // Subscribe with `resend_mode='since'` so the server only resends events - // strictly after the latest one we already have. If REST returned no - // events at all (brand-new conversation), fall back to `'all'` so any - // events that may have been written between the REST call and the WS - // handshake still show up. Dedup in the event store handles overlap. + // Subscribe with `resend_mode='since'` so the server only resends from the + // selected replay anchor. For ACP conversations that anchor is the latest + // user message, because ACP streaming deltas can be missing from REST while + // still replayable over the socket. If no anchor is available, fall back to + // `'all'` so events written between REST and the WS handshake still show up. + // Dedup in the event store handles overlap. const queryParams: Record = initialAfterTimestamp ? { resend_mode: "since", after_timestamp: initialAfterTimestamp } : { resend_mode: "all" }; diff --git a/src/contexts/websocket-provider-wrapper.tsx b/src/contexts/websocket-provider-wrapper.tsx index 0e9a375ae..35a4e4acd 100644 --- a/src/contexts/websocket-provider-wrapper.tsx +++ b/src/contexts/websocket-provider-wrapper.tsx @@ -37,6 +37,8 @@ export function WebSocketProviderWrapper({ conversationId={conversationId} conversationUrl={conversationUrl} sessionApiKey={conversation?.session_api_key} + agentKind={conversation?.agent_kind} + executionStatus={conversation?.execution_status} subConversationIds={conversation?.sub_conversation_ids} subConversations={filteredSubConversations} > diff --git a/src/stores/use-event-store.ts b/src/stores/use-event-store.ts index 7c89e5189..151131633 100644 --- a/src/stores/use-event-store.ts +++ b/src/stores/use-event-store.ts @@ -121,11 +121,43 @@ const appendEvent = (state: EventState, event: OHEvent): EventState => { }; }; -const sortEventState = (state: EventState): EventState => ({ - ...state, - events: [...state.events].sort(compareEventsByTimestamp), - uiEvents: [...state.uiEvents].sort(compareEventsByTimestamp), -}); +const compactConsecutiveStreamingDeltas = (events: OHEvent[]): OHEvent[] => { + const compacted: OHEvent[] = []; + + for (const event of events) { + const lastIndex = compacted.length - 1; + const lastEvent = compacted[lastIndex]; + if ( + lastEvent && + isStreamingDeltaEvent(event) && + isStreamingDeltaEvent(lastEvent) + ) { + compacted[lastIndex] = mergeStreamingDeltaEvent(event, lastEvent); + } else { + compacted.push(event); + } + } + + return compacted; +}; + +const buildUiEvents = (events: OHEvent[]): OHEvent[] => + events.reduce( + (uiEvents, event) => handleEventForUI(event, uiEvents), + [], + ); + +const normalizeEventState = (state: EventState): EventState => { + const events = compactConsecutiveStreamingDeltas( + [...state.events].sort(compareEventsByTimestamp), + ); + + return { + ...state, + events, + uiEvents: buildUiEvents(events), + }; +}; const applyAddEvent = (state: EventState, event: OHEvent): EventState => { const next = appendEvent(state, event); @@ -140,7 +172,7 @@ const applyAddEvent = (state: EventState, event: OHEvent): EventState => { return next; } - return sortEventState(next); + return normalizeEventState(next); }; export const useEventStore = create()((set) => ({ @@ -188,7 +220,7 @@ export const useEventStore = create()((set) => ({ return state; } - return sortEventState({ + return normalizeEventState({ ...state, events, eventIds,