Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions openhands-agent-server/openhands/agent_server/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from pathlib import Path
from uuid import UUID, uuid4

from pydantic import ValidationError

from openhands.agent_server.conversation_lease import (
ConversationLease,
ConversationOwnershipLostError,
Expand Down Expand Up @@ -204,6 +206,18 @@ def _event_matches_filters(
return False
return True

def _get_searchable_event(self, events, index: int) -> Event | None:
try:
return events[index]
except (FileNotFoundError, UnicodeDecodeError, ValidationError) as exc:
logger.warning(
"Skipping unreadable event at index %d for conversation %s: %s",
index,
self.stored.id,
exc,
)
return None

def _search_events_sync(
self,
page_id: str | None = None,
Expand Down Expand Up @@ -259,7 +273,8 @@ def _search_events_sync(
start_index = None
else:
for i in range(total):
if events[i].id == page_id:
event = self._get_searchable_event(events, i)
if event is not None and event.id == page_id:
start_index = i
break
if start_index is None:
Expand All @@ -273,7 +288,9 @@ def _search_events_sync(
items: list[Event] = []
next_page_id: str | None = None
for i in indices:
event = events[i]
event = self._get_searchable_event(events, i)
if event is None:
continue
if not self._event_matches_filters(
event, kind, source, body, timestamp_gte_str, timestamp_lt_str
):
Expand Down Expand Up @@ -347,7 +364,10 @@ def _count_events_sync(
timestamp_lt_str = timestamp__lt.isoformat() if timestamp__lt else None

count = 0
for event in events:
for i in range(len(events)):
event = self._get_searchable_event(events, i)
if event is None:
continue
if self._event_matches_filters(
event, kind, source, body, timestamp_gte_str, timestamp_lt_str
):
Expand Down
77 changes: 77 additions & 0 deletions tests/agent_server/test_event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from openhands.agent_server.pub_sub import Subscriber
from openhands.sdk import LLM, Agent, AgentBase, Conversation, Message
from openhands.sdk.agent import ACPAgent
from openhands.sdk.conversation.event_store import EventLog
from openhands.sdk.conversation.fifo_lock import FIFOLock
from openhands.sdk.conversation.impl.local_conversation import (
ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID,
Expand All @@ -41,6 +42,7 @@
MessageEvent,
ObservationEvent,
)
from openhands.sdk.io.memory import InMemoryFileStore
from openhands.sdk.llm import MessageToolCall, TextContent
from openhands.sdk.security.confirmation_policy import NeverConfirm
from openhands.sdk.workspace import LocalWorkspace
Expand Down Expand Up @@ -314,6 +316,81 @@ async def test_search_events_large_limit(
assert len(result.items) == 5 # All available events
assert result.next_page_id is None

@pytest.mark.asyncio
async def test_search_events_skips_empty_event_files(self, event_service):
"""Unreadable persisted events should not abort event pagination."""
fs = InMemoryFileStore()
event0 = MessageEvent(
id="00000000-0000-0000-0000-000000000001",
source="user",
llm_message=Message(role="user", content=[TextContent(text="first")]),
timestamp="2026-06-16T09:00:00",
)
missing_event_id = "00000000-0000-0000-0000-000000000002"
event2 = MessageEvent(
id="00000000-0000-0000-0000-000000000003",
source="user",
llm_message=Message(role="user", content=[TextContent(text="third")]),
timestamp="2026-06-16T09:00:02",
)
fs.write(
f"events/event-00000-{event0.id}.json",
event0.model_dump_json(exclude_none=True),
)
fs.write(f"events/event-00001-{missing_event_id}.json", "")
fs.write(
f"events/event-00002-{event2.id}.json",
event2.model_dump_json(exclude_none=True),
)

conversation = MagicMock(spec=Conversation)
state = MagicMock(spec=ConversationState)
state.events = EventLog(fs)
conversation._state = state
event_service._conversation = conversation

result = await event_service.search_events(limit=10)

assert [event.id for event in result.items] == [event0.id, event2.id]
assert result.next_page_id is None

@pytest.mark.asyncio
async def test_count_events_skips_empty_event_files(self, event_service):
"""Filtered event counts should skip unreadable persisted events."""
fs = InMemoryFileStore()
event0 = MessageEvent(
id="00000000-0000-0000-0000-000000000001",
source="user",
llm_message=Message(role="user", content=[TextContent(text="first")]),
timestamp="2026-06-16T09:00:00",
)
missing_event_id = "00000000-0000-0000-0000-000000000002"
event2 = MessageEvent(
id="00000000-0000-0000-0000-000000000003",
source="user",
llm_message=Message(role="user", content=[TextContent(text="third")]),
timestamp="2026-06-16T09:00:02",
)
fs.write(
f"events/event-00000-{event0.id}.json",
event0.model_dump_json(exclude_none=True),
)
fs.write(f"events/event-00001-{missing_event_id}.json", "")
fs.write(
f"events/event-00002-{event2.id}.json",
event2.model_dump_json(exclude_none=True),
)

conversation = MagicMock(spec=Conversation)
state = MagicMock(spec=ConversationState)
state.events = EventLog(fs)
conversation._state = state
event_service._conversation = conversation

count = await event_service.count_events(source="user")

assert count == 2

@pytest.mark.asyncio
async def test_search_events_zero_limit(
self, event_service, mock_conversation_with_events
Expand Down
Loading