diff --git a/openhands-tools/openhands/tools/task/manager.py b/openhands-tools/openhands/tools/task/manager.py index fc6685ab98..daf6fbec18 100644 --- a/openhands-tools/openhands/tools/task/manager.py +++ b/openhands-tools/openhands/tools/task/manager.py @@ -10,6 +10,7 @@ if the task is resumed for further work later. """ +import json import shutil import tempfile import threading @@ -19,7 +20,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Final -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, ValidationError from openhands.sdk import Agent from openhands.sdk.conversation.impl.local_conversation import LocalConversation @@ -43,6 +44,7 @@ logger = get_logger(__name__) _SUBAGENTS_DIR: Final[str] = "subagents" +_INDEX_FILENAME: Final[str] = "task_index.json" class TaskStatus(StrEnum): @@ -134,6 +136,57 @@ def _ensure_parent(self, conversation: LocalConversation) -> None: self._persistence_dir = Path( tempfile.mkdtemp(prefix="openhands_tasks_") ) + # Rehydrate prior tasks so resume works across a process restart. + self._load_index() + + @property + def _index_path(self) -> Path | None: + """Path to the persisted task index, or None when the parent conversation + does not persist (the index would not survive a restart anyway). Derived + the same way as close(), so there is no separate state to keep in sync.""" + parent = self._parent_conversation + if self._persistence_dir is None or parent is None: + return None + if parent.state.persistence_dir is None: + return None + return self._persistence_dir / _INDEX_FILENAME + + def _save_index(self) -> None: + """Persist the task_id -> conversation_id index so resume survives a + process restart. No-op when the parent does not persist.""" + index_path = self._index_path + if index_path is None: + return + with self._tasks_lock: + payload = [task.model_dump(mode="json") for task in self._tasks.values()] + tmp_path = index_path.parent / f"{index_path.name}.tmp" + try: + tmp_path.write_text(json.dumps(payload), encoding="utf-8") + tmp_path.replace(index_path) # atomic on the same filesystem + except OSError as e: + logger.warning(f"Failed to persist task index: {e}") + + def _load_index(self) -> None: + """Rehydrate the task index from disk. Conversations are reconstructed + lazily on resume; entries load with conversation=None. Tolerates a + missing or unreadable index.""" + index_path = self._index_path + if index_path is None or not index_path.exists(): + return + try: + payload = json.loads(index_path.read_text(encoding="utf-8")) + except (OSError, ValueError) as e: + logger.warning(f"Failed to read task index, ignoring: {e}") + return + with self._tasks_lock: + for entry in payload: + try: + task = Task.model_validate(entry) + except ValidationError as e: + logger.warning(f"Skipping invalid task index entry: {e}") + continue + # Do not clobber a live in-memory task with a disk snapshot. + self._tasks.setdefault(task.id, task) @property def parent_conversation(self) -> LocalConversation: @@ -157,6 +210,8 @@ def _evict_task(self, task: Task) -> None: task.conversation.close() with self._tasks_lock: self._tasks[task.id] = task.model_copy(update={"conversation": None}) + # Persist the final status (completed/error) for cross-process visibility. + self._save_index() def start_task( self, @@ -229,8 +284,10 @@ def _resume_task(self, resume: str, subagent_type: str) -> Task: "status": TaskStatus.RUNNING, } ) + task = self._tasks[resume] - return self._tasks[resume] + self._save_index() + return task def _create_task( self, @@ -269,13 +326,16 @@ def _create_task( factory.definition.get_confirmation_policy(), ) - self._tasks[task_id] = Task( + task = Task( id=task_id, conversation_id=conversation_id, conversation=sub_conversation, status=TaskStatus.RUNNING, ) - return self._tasks[task_id] + self._tasks[task_id] = task + + self._save_index() + return task def _get_conversation( self, diff --git a/tests/tools/task/test_task_manager.py b/tests/tools/task/test_task_manager.py index 523d80c394..35c4cb851a 100644 --- a/tests/tools/task/test_task_manager.py +++ b/tests/tools/task/test_task_manager.py @@ -1,3 +1,4 @@ +import json import uuid from pathlib import Path from unittest.mock import MagicMock, patch @@ -15,6 +16,7 @@ from openhands.sdk.subagent.schema import AgentDefinition from openhands.tools.preset import register_builtins_agents from openhands.tools.task.manager import ( + _INDEX_FILENAME, Task, TaskManager, TaskStatus, @@ -942,3 +944,102 @@ def test_with_persistence_subagent_conv_stored_under_subagents(self, tmp_path): conv_persistence = conv.state.persistence_dir assert conv_persistence is not None assert str(conv_persistence).startswith(str(manager._persistence_dir)) + + +class TestTaskIndexPersistence: + """Cross-process resume: the task_id->conversation_id index is persisted under + the parent conversation's dir and rehydrated by a fresh TaskManager.""" + + def _parent(self, persist, work, conversation_id): + agent = Agent(llm=_make_llm(), tools=[]) + return LocalConversation( + agent=agent, + workspace=str(work), + visualizer=None, + delete_on_close=False, + persistence_dir=str(persist), + conversation_id=conversation_id, + ) + + def test_index_written_when_parent_persists(self, tmp_path): + register_builtins_agents() + persist, work = tmp_path / "p", tmp_path / "w" + persist.mkdir() + work.mkdir() + m = TaskManager() + m._ensure_parent(self._parent(persist, work, uuid.uuid4())) + task = m._create_task(subagent_type="general-purpose", description=None) + + index_path = m._index_path + assert index_path is not None and index_path.name == _INDEX_FILENAME + assert index_path.exists() + entries = json.loads(index_path.read_text()) + assert any(e["id"] == task.id for e in entries) + + def test_no_index_when_parent_not_persisting(self, tmp_path): + register_builtins_agents() + agent = Agent(llm=_make_llm(), tools=[]) + parent = LocalConversation( + agent=agent, workspace=str(tmp_path), visualizer=None, delete_on_close=False + ) + m = TaskManager() + m._ensure_parent(parent) + m._create_task(subagent_type="general-purpose", description=None) + assert m._index_path is None + + def test_resume_across_fresh_manager(self, tmp_path): + """A new TaskManager (simulating a process restart) rehydrates the index + from the same parent dir and can resume a prior task.""" + register_builtins_agents() + persist, work = tmp_path / "p", tmp_path / "w" + persist.mkdir() + work.mkdir() + pcid = uuid.uuid4() # parent conversation id is stable across restart + + m1 = TaskManager() + m1._ensure_parent(self._parent(persist, work, pcid)) + task = m1._create_task(subagent_type="general-purpose", description=None) + tid, cid = task.id, task.conversation_id + m1._evict_task(task) + + m2 = TaskManager() + m2._ensure_parent(self._parent(persist, work, pcid)) + assert tid in m2._tasks # rehydrated from disk + resumed = m2._resume_task(resume=tid, subagent_type="general-purpose") + assert resumed.id == tid + assert resumed.conversation_id == cid + assert resumed.conversation is not None + assert resumed.conversation.state.id == cid + + def test_evicted_status_persisted(self, tmp_path): + register_builtins_agents() + persist, work = tmp_path / "p", tmp_path / "w" + persist.mkdir() + work.mkdir() + m = TaskManager() + m._ensure_parent(self._parent(persist, work, uuid.uuid4())) + task = m._create_task(subagent_type="general-purpose", description=None) + task.set_result("done") + m._evict_task(task) + index_path = m._index_path + assert index_path is not None + entries = json.loads(index_path.read_text()) + entry = next(e for e in entries if e["id"] == task.id) + assert entry["status"] == TaskStatus.COMPLETED.value + + def test_load_index_tolerates_corrupt_file(self, tmp_path): + register_builtins_agents() + persist, work = tmp_path / "p", tmp_path / "w" + persist.mkdir() + work.mkdir() + pcid = uuid.uuid4() + m1 = TaskManager() + m1._ensure_parent(self._parent(persist, work, pcid)) + m1._create_task(subagent_type="general-purpose", description=None) + # Corrupt the index, then load from a fresh manager — must not raise. + index_path = m1._index_path + assert index_path is not None + index_path.write_text("{not valid json") + m2 = TaskManager() + m2._ensure_parent(self._parent(persist, work, pcid)) # _load_index runs here + assert m2._tasks == {}