Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 64 additions & 4 deletions openhands-tools/openhands/tools/task/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
if the task is resumed for further work later.
"""

import json
import shutil
import tempfile
import threading
Expand All @@ -19,7 +20,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Final

from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, ValidationError

from openhands.sdk import Agent
from openhands.sdk.conversation.impl.local_conversation import LocalConversation
Expand All @@ -43,6 +44,7 @@
logger = get_logger(__name__)

_SUBAGENTS_DIR: Final[str] = "subagents"
_INDEX_FILENAME: Final[str] = "task_index.json"


class TaskStatus(StrEnum):
Expand Down Expand Up @@ -134,6 +136,57 @@ def _ensure_parent(self, conversation: LocalConversation) -> None:
self._persistence_dir = Path(
tempfile.mkdtemp(prefix="openhands_tasks_")
)
# Rehydrate prior tasks so resume works across a process restart.
self._load_index()

@property
def _index_path(self) -> Path | None:
"""Path to the persisted task index, or None when the parent conversation
does not persist (the index would not survive a restart anyway). Derived
the same way as close(), so there is no separate state to keep in sync."""
parent = self._parent_conversation
if self._persistence_dir is None or parent is None:
return None
if parent.state.persistence_dir is None:
return None
return self._persistence_dir / _INDEX_FILENAME

def _save_index(self) -> None:
"""Persist the task_id -> conversation_id index so resume survives a
process restart. No-op when the parent does not persist."""
index_path = self._index_path
if index_path is None:
return
with self._tasks_lock:
payload = [task.model_dump(mode="json") for task in self._tasks.values()]
tmp_path = index_path.parent / f"{index_path.name}.tmp"
try:
tmp_path.write_text(json.dumps(payload), encoding="utf-8")
tmp_path.replace(index_path) # atomic on the same filesystem
except OSError as e:
logger.warning(f"Failed to persist task index: {e}")

def _load_index(self) -> None:
"""Rehydrate the task index from disk. Conversations are reconstructed
lazily on resume; entries load with conversation=None. Tolerates a
missing or unreadable index."""
index_path = self._index_path
if index_path is None or not index_path.exists():
return
try:
payload = json.loads(index_path.read_text(encoding="utf-8"))
except (OSError, ValueError) as e:
logger.warning(f"Failed to read task index, ignoring: {e}")
return
with self._tasks_lock:
for entry in payload:
try:
task = Task.model_validate(entry)
except ValidationError as e:
logger.warning(f"Skipping invalid task index entry: {e}")
continue
# Do not clobber a live in-memory task with a disk snapshot.
self._tasks.setdefault(task.id, task)

@property
def parent_conversation(self) -> LocalConversation:
Expand All @@ -157,6 +210,8 @@ def _evict_task(self, task: Task) -> None:
task.conversation.close()
with self._tasks_lock:
self._tasks[task.id] = task.model_copy(update={"conversation": None})
# Persist the final status (completed/error) for cross-process visibility.
self._save_index()

def start_task(
self,
Expand Down Expand Up @@ -229,8 +284,10 @@ def _resume_task(self, resume: str, subagent_type: str) -> Task:
"status": TaskStatus.RUNNING,
}
)
task = self._tasks[resume]

return self._tasks[resume]
self._save_index()
return task

def _create_task(
self,
Expand Down Expand Up @@ -269,13 +326,16 @@ def _create_task(
factory.definition.get_confirmation_policy(),
)

self._tasks[task_id] = Task(
task = Task(
id=task_id,
conversation_id=conversation_id,
conversation=sub_conversation,
status=TaskStatus.RUNNING,
)
return self._tasks[task_id]
self._tasks[task_id] = task

self._save_index()
return task

def _get_conversation(
self,
Expand Down
101 changes: 101 additions & 0 deletions tests/tools/task/test_task_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import uuid
from pathlib import Path
from unittest.mock import MagicMock, patch
Expand All @@ -15,6 +16,7 @@
from openhands.sdk.subagent.schema import AgentDefinition
from openhands.tools.preset import register_builtins_agents
from openhands.tools.task.manager import (
_INDEX_FILENAME,
Task,
TaskManager,
TaskStatus,
Expand Down Expand Up @@ -942,3 +944,102 @@ def test_with_persistence_subagent_conv_stored_under_subagents(self, tmp_path):
conv_persistence = conv.state.persistence_dir
assert conv_persistence is not None
assert str(conv_persistence).startswith(str(manager._persistence_dir))


class TestTaskIndexPersistence:
"""Cross-process resume: the task_id->conversation_id index is persisted under
the parent conversation's dir and rehydrated by a fresh TaskManager."""

def _parent(self, persist, work, conversation_id):
agent = Agent(llm=_make_llm(), tools=[])
return LocalConversation(
agent=agent,
workspace=str(work),
visualizer=None,
delete_on_close=False,
persistence_dir=str(persist),
conversation_id=conversation_id,
)

def test_index_written_when_parent_persists(self, tmp_path):
register_builtins_agents()
persist, work = tmp_path / "p", tmp_path / "w"
persist.mkdir()
work.mkdir()
m = TaskManager()
m._ensure_parent(self._parent(persist, work, uuid.uuid4()))
task = m._create_task(subagent_type="general-purpose", description=None)

index_path = m._index_path
assert index_path is not None and index_path.name == _INDEX_FILENAME
assert index_path.exists()
entries = json.loads(index_path.read_text())
assert any(e["id"] == task.id for e in entries)

def test_no_index_when_parent_not_persisting(self, tmp_path):
register_builtins_agents()
agent = Agent(llm=_make_llm(), tools=[])
parent = LocalConversation(
agent=agent, workspace=str(tmp_path), visualizer=None, delete_on_close=False
)
m = TaskManager()
m._ensure_parent(parent)
m._create_task(subagent_type="general-purpose", description=None)
assert m._index_path is None

def test_resume_across_fresh_manager(self, tmp_path):
"""A new TaskManager (simulating a process restart) rehydrates the index
from the same parent dir and can resume a prior task."""
register_builtins_agents()
persist, work = tmp_path / "p", tmp_path / "w"
persist.mkdir()
work.mkdir()
pcid = uuid.uuid4() # parent conversation id is stable across restart

m1 = TaskManager()
m1._ensure_parent(self._parent(persist, work, pcid))
task = m1._create_task(subagent_type="general-purpose", description=None)
tid, cid = task.id, task.conversation_id
m1._evict_task(task)

m2 = TaskManager()
m2._ensure_parent(self._parent(persist, work, pcid))
assert tid in m2._tasks # rehydrated from disk
resumed = m2._resume_task(resume=tid, subagent_type="general-purpose")
assert resumed.id == tid
assert resumed.conversation_id == cid
assert resumed.conversation is not None
assert resumed.conversation.state.id == cid

def test_evicted_status_persisted(self, tmp_path):
register_builtins_agents()
persist, work = tmp_path / "p", tmp_path / "w"
persist.mkdir()
work.mkdir()
m = TaskManager()
m._ensure_parent(self._parent(persist, work, uuid.uuid4()))
task = m._create_task(subagent_type="general-purpose", description=None)
task.set_result("done")
m._evict_task(task)
index_path = m._index_path
assert index_path is not None
entries = json.loads(index_path.read_text())
entry = next(e for e in entries if e["id"] == task.id)
assert entry["status"] == TaskStatus.COMPLETED.value

def test_load_index_tolerates_corrupt_file(self, tmp_path):
register_builtins_agents()
persist, work = tmp_path / "p", tmp_path / "w"
persist.mkdir()
work.mkdir()
pcid = uuid.uuid4()
m1 = TaskManager()
m1._ensure_parent(self._parent(persist, work, pcid))
m1._create_task(subagent_type="general-purpose", description=None)
# Corrupt the index, then load from a fresh manager — must not raise.
index_path = m1._index_path
assert index_path is not None
index_path.write_text("{not valid json")
m2 = TaskManager()
m2._ensure_parent(self._parent(persist, work, pcid)) # _load_index runs here
assert m2._tasks == {}
Loading