diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 86d119e10a..e34b86895e 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -6,6 +6,8 @@ from pathlib import Path from uuid import UUID, uuid4 +from pydantic import ValidationError + from openhands.agent_server.conversation_lease import ( ConversationLease, ConversationOwnershipLostError, @@ -204,6 +206,18 @@ def _event_matches_filters( return False return True + def _get_searchable_event(self, events, index: int) -> Event | None: + try: + return events[index] + except (FileNotFoundError, UnicodeDecodeError, ValidationError) as exc: + logger.warning( + "Skipping unreadable event at index %d for conversation %s: %s", + index, + self.stored.id, + exc, + ) + return None + def _search_events_sync( self, page_id: str | None = None, @@ -259,7 +273,8 @@ def _search_events_sync( start_index = None else: for i in range(total): - if events[i].id == page_id: + event = self._get_searchable_event(events, i) + if event is not None and event.id == page_id: start_index = i break if start_index is None: @@ -273,7 +288,9 @@ def _search_events_sync( items: list[Event] = [] next_page_id: str | None = None for i in indices: - event = events[i] + event = self._get_searchable_event(events, i) + if event is None: + continue if not self._event_matches_filters( event, kind, source, body, timestamp_gte_str, timestamp_lt_str ): @@ -347,7 +364,10 @@ def _count_events_sync( timestamp_lt_str = timestamp__lt.isoformat() if timestamp__lt else None count = 0 - for event in events: + for i in range(len(events)): + event = self._get_searchable_event(events, i) + if event is None: + continue if self._event_matches_filters( event, kind, source, body, timestamp_gte_str, timestamp_lt_str ): diff --git a/tests/agent_server/test_event_service.py b/tests/agent_server/test_event_service.py index 519ef88561..0361a1340d 100644 --- a/tests/agent_server/test_event_service.py +++ b/tests/agent_server/test_event_service.py @@ -24,6 +24,7 @@ from openhands.agent_server.pub_sub import Subscriber from openhands.sdk import LLM, Agent, AgentBase, Conversation, Message from openhands.sdk.agent import ACPAgent +from openhands.sdk.conversation.event_store import EventLog from openhands.sdk.conversation.fifo_lock import FIFOLock from openhands.sdk.conversation.impl.local_conversation import ( ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID, @@ -41,6 +42,7 @@ MessageEvent, ObservationEvent, ) +from openhands.sdk.io.memory import InMemoryFileStore from openhands.sdk.llm import MessageToolCall, TextContent from openhands.sdk.security.confirmation_policy import NeverConfirm from openhands.sdk.workspace import LocalWorkspace @@ -314,6 +316,81 @@ async def test_search_events_large_limit( assert len(result.items) == 5 # All available events assert result.next_page_id is None + @pytest.mark.asyncio + async def test_search_events_skips_empty_event_files(self, event_service): + """Unreadable persisted events should not abort event pagination.""" + fs = InMemoryFileStore() + event0 = MessageEvent( + id="00000000-0000-0000-0000-000000000001", + source="user", + llm_message=Message(role="user", content=[TextContent(text="first")]), + timestamp="2026-06-16T09:00:00", + ) + missing_event_id = "00000000-0000-0000-0000-000000000002" + event2 = MessageEvent( + id="00000000-0000-0000-0000-000000000003", + source="user", + llm_message=Message(role="user", content=[TextContent(text="third")]), + timestamp="2026-06-16T09:00:02", + ) + fs.write( + f"events/event-00000-{event0.id}.json", + event0.model_dump_json(exclude_none=True), + ) + fs.write(f"events/event-00001-{missing_event_id}.json", "") + fs.write( + f"events/event-00002-{event2.id}.json", + event2.model_dump_json(exclude_none=True), + ) + + conversation = MagicMock(spec=Conversation) + state = MagicMock(spec=ConversationState) + state.events = EventLog(fs) + conversation._state = state + event_service._conversation = conversation + + result = await event_service.search_events(limit=10) + + assert [event.id for event in result.items] == [event0.id, event2.id] + assert result.next_page_id is None + + @pytest.mark.asyncio + async def test_count_events_skips_empty_event_files(self, event_service): + """Filtered event counts should skip unreadable persisted events.""" + fs = InMemoryFileStore() + event0 = MessageEvent( + id="00000000-0000-0000-0000-000000000001", + source="user", + llm_message=Message(role="user", content=[TextContent(text="first")]), + timestamp="2026-06-16T09:00:00", + ) + missing_event_id = "00000000-0000-0000-0000-000000000002" + event2 = MessageEvent( + id="00000000-0000-0000-0000-000000000003", + source="user", + llm_message=Message(role="user", content=[TextContent(text="third")]), + timestamp="2026-06-16T09:00:02", + ) + fs.write( + f"events/event-00000-{event0.id}.json", + event0.model_dump_json(exclude_none=True), + ) + fs.write(f"events/event-00001-{missing_event_id}.json", "") + fs.write( + f"events/event-00002-{event2.id}.json", + event2.model_dump_json(exclude_none=True), + ) + + conversation = MagicMock(spec=Conversation) + state = MagicMock(spec=ConversationState) + state.events = EventLog(fs) + conversation._state = state + event_service._conversation = conversation + + count = await event_service.count_events(source="user") + + assert count == 2 + @pytest.mark.asyncio async def test_search_events_zero_limit( self, event_service, mock_conversation_with_events