Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 231 additions & 1 deletion openhands-tools/openhands/tools/file_editor/editor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import hashlib
import mimetypes
import os
import re
Expand Down Expand Up @@ -60,6 +61,41 @@ class FileEditor:
_max_file_size: int
_encoding_manager: EncodingManager
_cwd: str
# SDK-6: view-coverage tracking for dedupe.
#
# Per text-file path: a sorted, merged list of disjoint line intervals
# `[(start, end), ...]` (1-indexed, inclusive on both ends) describing
# which line ranges have already been shown to the model. A new view
# whose requested range overlaps significantly with this seen-set
# returns a small hint instead of the full content.
#
# Per directory path: a set of response-text SHA256s. Directories have
# no notion of "interval"; we fall back to exact-response dedupe and
# rely on the listing-text mismatch to naturally not dedupe when the
# directory contents have changed.
#
# Why two structures: the v1 design (single per-path hash set) caught
# only exact-range repeats. Trajectory analysis of Nemotron 550B
# SWE-Bench runs (e.g. `27293667611`) showed the dominant waste pattern
# is OVERLAPPING but non-identical view_ranges of the same file —
# `_iterative.py` viewed 17 times across 15 distinct ranges, all
# clustered around three regions. Hash-based dedupe caught 2/17;
# interval-coverage dedupe is designed to catch ~12/17 on this pattern.
#
# Both structures are dropped for a path on any successful `write_file`
# to that path (i.e., on every edit), so they only suppress
# genuinely-redundant views.
_view_intervals: dict[str, list[tuple[int, int]]]
_view_listing_hashes: dict[str, set[str]]

# Fraction of a requested range that must already be covered by the
# seen-intervals set before we short-circuit. Tuned to err on the side
# of deduping aggressively: false positives cost one extra turn (model
# re-requests with a tighter range from the hint); false negatives
# cost a full file dump (5–50 KB) that's then re-paid on every
# uncached subsequent turn → quadratic. 70% is the lowest threshold
# that still leaves clear "you'll see >30% new content" semantics.
_DEDUPE_COVERAGE_THRESHOLD: float = 0.7

def __init__(
self,
Expand All @@ -79,6 +115,8 @@ def __init__(
self._max_file_size = (
(max_file_size_mb or self.MAX_FILE_SIZE_MB) * 1024 * 1024
) # Convert to bytes
self._view_intervals = {}
self._view_listing_hashes = {}

# Initialize encoding manager
self._encoding_manager = EncodingManager()
Expand All @@ -94,6 +132,190 @@ def __init__(
self._cwd = os.path.abspath(os.getcwd())
logger.info(f"FileEditor initialized with cwd: {self._cwd}")

# ------------------------------------------------------------------ #
# SDK-6: view-coverage deduplication #
# ------------------------------------------------------------------ #

_VIEW_DEDUPE_HINT_INTERVAL: str = (
"[file_editor view dedupe] You have already viewed `{path}` covering "
"lines {seen_ranges}. The requested range {req_range} is {coverage} "
"inside what you have already been shown. Scroll back to your earlier "
"`view` observation rather than re-loading. To see truly new content, "
"request a `view_range` outside {seen_ranges}. If the file has been "
"changed externally (which would not be reflected in your prior view), "
"run `cat {path}` via the terminal tool."
)

_VIEW_DEDUPE_HINT_LISTING: str = (
"[file_editor view dedupe] You have already listed `{path}` earlier in "
"this conversation with identical contents. Scroll back to your prior "
"view rather than re-loading. If you suspect the directory has "
"changed, run `ls {path}` via the terminal tool."
)

# ---- interval algebra (1-indexed, inclusive on both ends) ----

@staticmethod
def _merge_intervals(
intervals: list[tuple[int, int]],
) -> list[tuple[int, int]]:
"""Sort and union overlapping / abutting intervals.

e.g. `[(110, 130), (115, 135), (170, 180)] -> [(110, 135), (170, 180)]`.
Abutting intervals (gap of 1) are merged too, since on a line index
`(1, 10)` ∪ `(11, 20)` is contiguous from the model's perspective.
"""
if not intervals:
return []
ordered = sorted(intervals)
merged: list[tuple[int, int]] = [ordered[0]]
for s, e in ordered[1:]:
ms, me = merged[-1]
if s <= me + 1:
merged[-1] = (ms, max(me, e))
else:
merged.append((s, e))
return merged

@staticmethod
def _coverage_fraction(
requested: tuple[int, int],
seen: list[tuple[int, int]],
) -> float:
"""Fraction of lines in `requested` that are already covered by any
interval in `seen`. Assumes `seen` is merged (so its intervals are
disjoint), which lets us just sum per-interval intersections."""
rs, re_ = requested
req_len = re_ - rs + 1
if req_len <= 0:
return 1.0
covered = 0
for s, e in seen:
os_ = max(s, rs)
oe = min(e, re_)
if oe >= os_:
covered += oe - os_ + 1
return min(1.0, covered / req_len)

@staticmethod
def _format_ranges(intervals: list[tuple[int, int]]) -> str:
"""Compact rendering of intervals for the hint, e.g. `[110-185, 270-350]`."""
if not intervals:
return "[]"
return "[" + ", ".join(f"{s}-{e}" for s, e in intervals) + "]"

# ---- view interval extraction ----

def _requested_view_interval(
self,
path: Path,
view_range: list[int] | None,
) -> tuple[int, int] | None:
"""Translate a `view` call into the (start, end) line interval the
model will see, or `None` if intervals don't apply (binary/image,
unreadable file, etc.). Mirrors the normalization in `view()`."""
try:
num_lines = self._count_lines(path)
except Exception:
return None
if num_lines <= 0:
return None
if not view_range:
return (1, num_lines)
if len(view_range) != 2:
return None
start, end = view_range
if not isinstance(start, int) or not isinstance(end, int):
return None
if end == -1 or end > num_lines:
end = num_lines
if start < 1 or end < start:
return None
return (start, end)

# ---- dispatch ----

def _maybe_dedupe_view(
self,
path: Path,
view_range: list[int] | None,
obs: FileEditorObservation,
) -> FileEditorObservation:
# Never dedupe errors — the model must see the full error each retry.
if obs.is_error:
return obs
# Skip anything with non-text content (images): we can't reason about
# interval coverage and exact-hash on the text wrapper would be
# incorrect (the bytes that matter live in ImageContent).
if any(not isinstance(c, TextContent) for c in obs.content):
return obs

path_key = str(path.resolve())

# Directories: response-hash dedupe. We have no per-line model of a
# listing, and listing text changes when any child changes, so the
# natural mismatch correctly skips dedupe on a stale cache.
if path.is_dir():
return self._dedupe_listing(path, path_key, obs)

# Text files: interval-coverage dedupe.
requested = self._requested_view_interval(path, view_range)
if requested is None:
return obs
seen = self._view_intervals.get(path_key, [])
coverage = self._coverage_fraction(requested, seen)
if seen and coverage >= self._DEDUPE_COVERAGE_THRESHOLD:
logger.info(
"file_editor view dedupe: %s requested=%s coverage=%.0f%% "
"(seen=%s) — returning hint",
path,
requested,
coverage * 100,
seen,
)
return FileEditorObservation.from_text(
text=self._VIEW_DEDUPE_HINT_INTERVAL.format(
path=path,
seen_ranges=self._format_ranges(seen),
req_range=f"[{requested[0]}-{requested[1]}]",
coverage=f"{coverage:.0%}",
),
command="view",
path=str(path),
prev_exist=True,
)
self._view_intervals[path_key] = self._merge_intervals(seen + [requested])
return obs

def _dedupe_listing(
self,
path: Path,
path_key: str,
obs: FileEditorObservation,
) -> FileEditorObservation:
"""Response-hash dedupe path for directory listings."""
text_parts: list[str] = []
for item in obs.content:
if isinstance(item, TextContent):
text_parts.append(item.text)
listing_text = "\n".join(text_parts)
new_hash = hashlib.sha256(listing_text.encode("utf-8")).hexdigest()
seen = self._view_listing_hashes.setdefault(path_key, set())
if new_hash in seen:
logger.info(
"file_editor view dedupe: directory %s — identical listing "
"already returned, hinting",
path,
)
return FileEditorObservation.from_text(
text=self._VIEW_DEDUPE_HINT_LISTING.format(path=path),
command="view",
path=str(path),
prev_exist=True,
)
seen.add(new_hash)
return obs

def __call__(
self,
*,
Expand All @@ -108,7 +330,9 @@ def __call__(
_path = Path(path)
self.validate_path(command, _path)
if command == "view":
return self.view(_path, view_range)
return self._maybe_dedupe_view(
_path, view_range, self.view(_path, view_range)
)
elif command == "create":
if file_text is None:
raise EditorToolParameterMissingError(command, "file_text")
Expand Down Expand Up @@ -468,6 +692,12 @@ def write_file(self, path: Path, file_text: str, encoding: str = "utf-8") -> Non
f.write(file_text)
except Exception as e:
raise ToolError(f"Ran into {e} while trying to write to {path}") from None
# SDK-6: the file just changed; any cached coverage is stale. Drop
# the entry under both keys so the next `view` returns fresh content
# rather than a hint pointing at obsolete intervals.
path_key = str(path.resolve())
self._view_intervals.pop(path_key, None)
self._view_listing_hashes.pop(path_key, None)

@with_encoding
def insert(
Expand Down
Loading
Loading