From 71d3833e859b7576fb4e0a1330f173491c0b3384 Mon Sep 17 00:00:00 2001 From: Alona King Date: Sun, 14 Jun 2026 14:29:48 -0400 Subject: [PATCH 1/3] feat(task): persist the sub-agent task index so resume survives a restart TaskManager.resume relied on the in-memory _tasks map, which is rebuilt empty in a new process, so a parent restart lost the ability to resume even though each sub-agent conversation's events persist on disk. Persist the task_id->task index to /subagents/task_index.json (written on create/resume/evict, atomic replace) and rehydrate it in _ensure_parent, so a fresh TaskManager for the same parent conversation can resume prior tasks. No-op when the parent does not persist; tolerant of a missing/corrupt index. --- .../openhands/tools/task/manager.py | 67 ++++++++++++- tests/tools/task/test_task_manager.py | 98 +++++++++++++++++++ 2 files changed, 161 insertions(+), 4 deletions(-) diff --git a/openhands-tools/openhands/tools/task/manager.py b/openhands-tools/openhands/tools/task/manager.py index fc6685ab98..5f3b205f20 100644 --- a/openhands-tools/openhands/tools/task/manager.py +++ b/openhands-tools/openhands/tools/task/manager.py @@ -10,6 +10,7 @@ if the task is resumed for further work later. """ +import json import shutil import tempfile import threading @@ -19,7 +20,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Final -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, ValidationError from openhands.sdk import Agent from openhands.sdk.conversation.impl.local_conversation import LocalConversation @@ -43,6 +44,7 @@ logger = get_logger(__name__) _SUBAGENTS_DIR: Final[str] = "subagents" +_INDEX_FILENAME: Final[str] = "task_index.json" class TaskStatus(StrEnum): @@ -105,6 +107,8 @@ def __init__( # Set once in _ensure_parent: uses the parent's subagents dir # when the parent persists, otherwise a temporary directory. self._persistence_dir: Path | None = None + # True only when the parent persists (the task index is durable then). + self._persists: bool = False def attach_parent(self, conversation: LocalConversation) -> None: """Attach the parent conversation used to create sub-agent tasks. @@ -130,10 +134,58 @@ def _ensure_parent(self, conversation: LocalConversation) -> None: if parent_persistence_dir is not None: self._persistence_dir = Path(parent_persistence_dir) / _SUBAGENTS_DIR self._persistence_dir.mkdir(parents=True, exist_ok=True) + self._persists = True else: self._persistence_dir = Path( tempfile.mkdtemp(prefix="openhands_tasks_") ) + self._persists = False + # Rehydrate prior tasks so resume works across a process restart. + self._load_index() + + @property + def _index_path(self) -> Path | None: + """Path to the persisted task index, or None when not persisting.""" + if self._persists and self._persistence_dir is not None: + return self._persistence_dir / _INDEX_FILENAME + return None + + def _save_index(self) -> None: + """Persist the task_id -> conversation_id index so resume survives a + process restart. No-op when the parent does not persist.""" + index_path = self._index_path + if index_path is None: + return + with self._tasks_lock: + payload = [task.model_dump(mode="json") for task in self._tasks.values()] + tmp_path = index_path.parent / f"{index_path.name}.tmp" + try: + tmp_path.write_text(json.dumps(payload), encoding="utf-8") + tmp_path.replace(index_path) # atomic on the same filesystem + except OSError as e: + logger.warning(f"Failed to persist task index: {e}") + + def _load_index(self) -> None: + """Rehydrate the task index from disk. Conversations are reconstructed + lazily on resume; entries load with conversation=None. Tolerates a + missing or unreadable index.""" + index_path = self._index_path + if index_path is None or not index_path.exists(): + return + try: + payload = json.loads(index_path.read_text(encoding="utf-8")) + except (OSError, ValueError) as e: + logger.warning(f"Failed to read task index, ignoring: {e}") + return + with self._tasks_lock: + for entry in payload: + try: + task = Task.model_validate(entry) + except ValidationError as e: + logger.warning(f"Skipping invalid task index entry: {e}") + continue + # Do not clobber a live in-memory task with a disk snapshot. + self._tasks.setdefault(task.id, task) @property def parent_conversation(self) -> LocalConversation: @@ -157,6 +209,8 @@ def _evict_task(self, task: Task) -> None: task.conversation.close() with self._tasks_lock: self._tasks[task.id] = task.model_copy(update={"conversation": None}) + # Persist the final status (completed/error) for cross-process visibility. + self._save_index() def start_task( self, @@ -229,8 +283,10 @@ def _resume_task(self, resume: str, subagent_type: str) -> Task: "status": TaskStatus.RUNNING, } ) + task = self._tasks[resume] - return self._tasks[resume] + self._save_index() + return task def _create_task( self, @@ -269,13 +325,16 @@ def _create_task( factory.definition.get_confirmation_policy(), ) - self._tasks[task_id] = Task( + task = Task( id=task_id, conversation_id=conversation_id, conversation=sub_conversation, status=TaskStatus.RUNNING, ) - return self._tasks[task_id] + self._tasks[task_id] = task + + self._save_index() + return task def _get_conversation( self, diff --git a/tests/tools/task/test_task_manager.py b/tests/tools/task/test_task_manager.py index 523d80c394..c6989aef92 100644 --- a/tests/tools/task/test_task_manager.py +++ b/tests/tools/task/test_task_manager.py @@ -1,3 +1,4 @@ +import json import uuid from pathlib import Path from unittest.mock import MagicMock, patch @@ -15,6 +16,7 @@ from openhands.sdk.subagent.schema import AgentDefinition from openhands.tools.preset import register_builtins_agents from openhands.tools.task.manager import ( + _INDEX_FILENAME, Task, TaskManager, TaskStatus, @@ -942,3 +944,99 @@ def test_with_persistence_subagent_conv_stored_under_subagents(self, tmp_path): conv_persistence = conv.state.persistence_dir assert conv_persistence is not None assert str(conv_persistence).startswith(str(manager._persistence_dir)) + + +class TestTaskIndexPersistence: + """Cross-process resume: the task_id->conversation_id index is persisted under + the parent conversation's dir and rehydrated by a fresh TaskManager.""" + + def _parent(self, persist, work, conversation_id): + agent = Agent(llm=_make_llm(), tools=[]) + return LocalConversation( + agent=agent, + workspace=str(work), + visualizer=None, + delete_on_close=False, + persistence_dir=str(persist), + conversation_id=conversation_id, + ) + + def test_index_written_when_parent_persists(self, tmp_path): + register_builtins_agents() + persist, work = tmp_path / "p", tmp_path / "w" + persist.mkdir() + work.mkdir() + m = TaskManager() + m._ensure_parent(self._parent(persist, work, uuid.uuid4())) + task = m._create_task(subagent_type="general-purpose", description=None) + + assert m._persists is True + assert m._index_path is not None and m._index_path.name == _INDEX_FILENAME + assert m._index_path.exists() + entries = json.loads(m._index_path.read_text()) + assert any(e["id"] == task.id for e in entries) + + def test_no_index_when_parent_not_persisting(self, tmp_path): + register_builtins_agents() + agent = Agent(llm=_make_llm(), tools=[]) + parent = LocalConversation( + agent=agent, workspace=str(tmp_path), visualizer=None, delete_on_close=False + ) + m = TaskManager() + m._ensure_parent(parent) + m._create_task(subagent_type="general-purpose", description=None) + assert m._persists is False + assert m._index_path is None + + def test_resume_across_fresh_manager(self, tmp_path): + """A new TaskManager (simulating a process restart) rehydrates the index + from the same parent dir and can resume a prior task.""" + register_builtins_agents() + persist, work = tmp_path / "p", tmp_path / "w" + persist.mkdir() + work.mkdir() + pcid = uuid.uuid4() # parent conversation id is stable across restart + + m1 = TaskManager() + m1._ensure_parent(self._parent(persist, work, pcid)) + task = m1._create_task(subagent_type="general-purpose", description=None) + tid, cid = task.id, task.conversation_id + m1._evict_task(task) + + m2 = TaskManager() + m2._ensure_parent(self._parent(persist, work, pcid)) + assert tid in m2._tasks # rehydrated from disk + resumed = m2._resume_task(resume=tid, subagent_type="general-purpose") + assert resumed.id == tid + assert resumed.conversation_id == cid + assert resumed.conversation is not None + assert resumed.conversation.state.id == cid + + def test_evicted_status_persisted(self, tmp_path): + register_builtins_agents() + persist, work = tmp_path / "p", tmp_path / "w" + persist.mkdir() + work.mkdir() + m = TaskManager() + m._ensure_parent(self._parent(persist, work, uuid.uuid4())) + task = m._create_task(subagent_type="general-purpose", description=None) + task.set_result("done") + m._evict_task(task) + entries = json.loads(m._index_path.read_text()) + entry = next(e for e in entries if e["id"] == task.id) + assert entry["status"] == TaskStatus.COMPLETED.value + + def test_load_index_tolerates_corrupt_file(self, tmp_path): + register_builtins_agents() + persist, work = tmp_path / "p", tmp_path / "w" + persist.mkdir() + work.mkdir() + pcid = uuid.uuid4() + m1 = TaskManager() + m1._ensure_parent(self._parent(persist, work, pcid)) + m1._create_task(subagent_type="general-purpose", description=None) + # Corrupt the index, then load from a fresh manager — must not raise. + m1._index_path.write_text("{not valid json") + m2 = TaskManager() + m2._ensure_parent(self._parent(persist, work, pcid)) # _load_index runs here + assert m2._tasks == {} From 07c94f54b43336bd2013889bf3b3299db9b63855 Mon Sep 17 00:00:00 2001 From: Alona King Date: Sun, 14 Jun 2026 14:44:03 -0400 Subject: [PATCH 2/3] test(task): narrow _index_path via locals to satisfy pyright --- tests/tools/task/test_task_manager.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/tools/task/test_task_manager.py b/tests/tools/task/test_task_manager.py index c6989aef92..3b4c1e1d5e 100644 --- a/tests/tools/task/test_task_manager.py +++ b/tests/tools/task/test_task_manager.py @@ -971,9 +971,10 @@ def test_index_written_when_parent_persists(self, tmp_path): task = m._create_task(subagent_type="general-purpose", description=None) assert m._persists is True - assert m._index_path is not None and m._index_path.name == _INDEX_FILENAME - assert m._index_path.exists() - entries = json.loads(m._index_path.read_text()) + index_path = m._index_path + assert index_path is not None and index_path.name == _INDEX_FILENAME + assert index_path.exists() + entries = json.loads(index_path.read_text()) assert any(e["id"] == task.id for e in entries) def test_no_index_when_parent_not_persisting(self, tmp_path): @@ -1022,7 +1023,9 @@ def test_evicted_status_persisted(self, tmp_path): task = m._create_task(subagent_type="general-purpose", description=None) task.set_result("done") m._evict_task(task) - entries = json.loads(m._index_path.read_text()) + index_path = m._index_path + assert index_path is not None + entries = json.loads(index_path.read_text()) entry = next(e for e in entries if e["id"] == task.id) assert entry["status"] == TaskStatus.COMPLETED.value @@ -1036,7 +1039,9 @@ def test_load_index_tolerates_corrupt_file(self, tmp_path): m1._ensure_parent(self._parent(persist, work, pcid)) m1._create_task(subagent_type="general-purpose", description=None) # Corrupt the index, then load from a fresh manager — must not raise. - m1._index_path.write_text("{not valid json") + index_path = m1._index_path + assert index_path is not None + index_path.write_text("{not valid json") m2 = TaskManager() m2._ensure_parent(self._parent(persist, work, pcid)) # _load_index runs here assert m2._tasks == {} From 15088b8dc37cee988bcd11f40d5500642f5fa49a Mon Sep 17 00:00:00 2001 From: Alona King Date: Sun, 14 Jun 2026 15:18:50 -0400 Subject: [PATCH 3/3] refactor(task): derive index persistence from parent instead of a cached flag Address review: drop the _persists bool and compute durability in _index_path the same way close() does (parent.state.persistence_dir is not None), so there is no duplicated state to keep in sync. --- openhands-tools/openhands/tools/task/manager.py | 17 +++++++++-------- tests/tools/task/test_task_manager.py | 2 -- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/openhands-tools/openhands/tools/task/manager.py b/openhands-tools/openhands/tools/task/manager.py index 5f3b205f20..daf6fbec18 100644 --- a/openhands-tools/openhands/tools/task/manager.py +++ b/openhands-tools/openhands/tools/task/manager.py @@ -107,8 +107,6 @@ def __init__( # Set once in _ensure_parent: uses the parent's subagents dir # when the parent persists, otherwise a temporary directory. self._persistence_dir: Path | None = None - # True only when the parent persists (the task index is durable then). - self._persists: bool = False def attach_parent(self, conversation: LocalConversation) -> None: """Attach the parent conversation used to create sub-agent tasks. @@ -134,21 +132,24 @@ def _ensure_parent(self, conversation: LocalConversation) -> None: if parent_persistence_dir is not None: self._persistence_dir = Path(parent_persistence_dir) / _SUBAGENTS_DIR self._persistence_dir.mkdir(parents=True, exist_ok=True) - self._persists = True else: self._persistence_dir = Path( tempfile.mkdtemp(prefix="openhands_tasks_") ) - self._persists = False # Rehydrate prior tasks so resume works across a process restart. self._load_index() @property def _index_path(self) -> Path | None: - """Path to the persisted task index, or None when not persisting.""" - if self._persists and self._persistence_dir is not None: - return self._persistence_dir / _INDEX_FILENAME - return None + """Path to the persisted task index, or None when the parent conversation + does not persist (the index would not survive a restart anyway). Derived + the same way as close(), so there is no separate state to keep in sync.""" + parent = self._parent_conversation + if self._persistence_dir is None or parent is None: + return None + if parent.state.persistence_dir is None: + return None + return self._persistence_dir / _INDEX_FILENAME def _save_index(self) -> None: """Persist the task_id -> conversation_id index so resume survives a diff --git a/tests/tools/task/test_task_manager.py b/tests/tools/task/test_task_manager.py index 3b4c1e1d5e..35c4cb851a 100644 --- a/tests/tools/task/test_task_manager.py +++ b/tests/tools/task/test_task_manager.py @@ -970,7 +970,6 @@ def test_index_written_when_parent_persists(self, tmp_path): m._ensure_parent(self._parent(persist, work, uuid.uuid4())) task = m._create_task(subagent_type="general-purpose", description=None) - assert m._persists is True index_path = m._index_path assert index_path is not None and index_path.name == _INDEX_FILENAME assert index_path.exists() @@ -986,7 +985,6 @@ def test_no_index_when_parent_not_persisting(self, tmp_path): m = TaskManager() m._ensure_parent(parent) m._create_task(subagent_type="general-purpose", description=None) - assert m._persists is False assert m._index_path is None def test_resume_across_fresh_manager(self, tmp_path):