Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 99 additions & 9 deletions openhands-sdk/openhands/sdk/agent/acp_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import threading
import time
import uuid
from collections.abc import Generator
from collections.abc import Callable, Generator
from concurrent.futures import Future
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, NamedTuple
Expand Down Expand Up @@ -433,6 +433,24 @@ def _image_url_to_acp_block(url: str) -> ImageContentBlock | None:
return image_block(data="", mime_type="image/png", uri=url)


def _mask_json_value(value: Any, mask: Callable[[str], str]) -> Any:
"""Recursively apply *mask* to every string leaf of a JSON-like value.

ACP tool-call ``raw_input`` / ``raw_output`` / ``content`` blocks are
arbitrary JSON (a bare string, a dict of params, a list of content
blocks). ``SecretRegistry.mask_secrets_in_output`` is a pure string op,
so walk the structure and mask each leaf string; non-string leaves
(ints, bools, ``None``) pass through unchanged.
"""
if isinstance(value, str):
return mask(value)
if isinstance(value, dict):
return {k: _mask_json_value(v, mask) for k, v in value.items()}
if isinstance(value, list):
return [_mask_json_value(v, mask) for v in value]
return value


def _serialize_tool_content(content: list[Any] | None) -> list[dict[str, Any]] | None:
"""Serialize ACP tool call content blocks to plain dicts for JSON storage."""
if not content:
Expand Down Expand Up @@ -576,6 +594,13 @@ def __init__(self) -> None:
# signal that the ACP subprocess is still actively working. Set by
# ACPAgent.step() to keep the agent-server's idle timer alive.
self.on_activity: Any = None # Callable[[], None] | None
# Secret masker — set per turn by ACPAgent to
# ``state.secret_registry.mask_secrets_in_output``. Applied to streamed
# text chunks and tool-call raw_input/raw_output/content before they
# reach ``on_token`` / ``on_event`` so a subprocess that echoes an
# injected credential never lands in the (persisted, network-relayed)
# event stream in cleartext. ``None`` ⇒ no-op (bridge used standalone).
self.mask: Callable[[str], str] | None = None
self._last_activity_signal: float = float("-inf")
# Telemetry state from UsageUpdate (persists across turns)
self._last_cost: float = 0.0 # last cumulative cost seen
Expand Down Expand Up @@ -619,6 +644,37 @@ def pop_turn_usage_update(self, session_id: str) -> Any:
self._usage_received.pop(session_id, None)
return self._turn_usage_updates.pop(session_id, None)

def _mask_value(self, value: Any) -> Any:
"""Mask injected secrets in *value* (string or JSON-like), no-op if unset.

Defensive: on mask failure, returns the original value unchanged and
logs at DEBUG — this may transiently leak the credential but prevents a
crash, matching the regular terminal tool's masking contract. (Masking
is a pure ``str.replace`` and should never raise in practice.)
"""
Comment thread
simonrosenberg marked this conversation as resolved.
if self.mask is None:
return value
try:
return _mask_json_value(value, self.mask)
except Exception:
logger.debug("secret masking failed", exc_info=True)
return value

def _mask_tool_call_entry(self, entry: dict[str, Any]) -> None:
"""Mask title / raw_input / raw_output / content of a tool-call entry.

Applied in place at ingestion (``session_update``) so the accumulator
itself never holds plaintext secrets, and every downstream emitter
(``_emit_tool_call_event`` and the supersede path in
``_cancel_inflight_tool_calls``) carries masked values for free.
``title`` is normally a benign server-set label, but a misbehaving ACP
server could echo a credential there (e.g. ``Running: curl -H
'Authorization: Bearer <token>'``), so it is masked too.
"""
for key in ("title", "raw_input", "raw_output", "content"):
if entry.get(key) is not None:
entry[key] = self._mask_value(entry[key])

# -- Client protocol methods ------------------------------------------

async def session_update(
Expand All @@ -629,16 +685,26 @@ async def session_update(
) -> None:
logger.debug("ACP session_update: type=%s", type(update).__name__)

# Route fork session updates to the fork accumulator
# Route fork session updates to the fork accumulator. ask_agent() joins
# and returns this text to the caller (a UI/network sink), so mask it
# like the main-turn path — a secret echoed in a fork session must not
# leak in cleartext.
if self._fork_session_id is not None and session_id == self._fork_session_id:
if isinstance(update, AgentMessageChunk):
if isinstance(update.content, TextContentBlock):
self._fork_accumulated_text.append(update.content.text)
self._fork_accumulated_text.append(
self._mask_value(update.content.text)
)
return

if isinstance(update, AgentMessageChunk):
if isinstance(update.content, TextContentBlock):
text = update.content.text
# Mask once, then use the masked chunk for both the persisted
# accumulation and the live ``on_token`` relay. A secret split
Comment thread
simonrosenberg marked this conversation as resolved.
# across two chunks slips through here (each piece alone won't
# match); the joined response is re-masked at the persistence
# boundary in ``_finalize_successful_turn`` to catch that.
text = self._mask_value(update.content.text)
Comment thread
simonrosenberg marked this conversation as resolved.
self.accumulated_text.append(text)
if self.on_token is not None:
try:
Expand All @@ -648,7 +714,7 @@ async def session_update(
self._maybe_signal_activity()
elif isinstance(update, AgentThoughtChunk):
if isinstance(update.content, TextContentBlock):
self.accumulated_thoughts.append(update.content.text)
self.accumulated_thoughts.append(self._mask_value(update.content.text))
elif isinstance(update, UsageUpdate):
# Store the update for step()/ask_agent() to process in one place.
self._context_window = update.size
Expand All @@ -667,6 +733,7 @@ async def session_update(
"raw_output": update.raw_output,
"content": _serialize_tool_content(update.content),
}
self._mask_tool_call_entry(entry)
self.accumulated_tool_calls.append(entry)
logger.debug("ACP tool call start: %s", update.tool_call_id)
# Emit one early "started" event — the action half of the
Expand Down Expand Up @@ -700,6 +767,12 @@ async def session_update(
target = tc
break
logger.debug("ACP tool call progress: %s", update.tool_call_id)
# Mask the merged entry on every frame so the accumulator (and thus
# the terminal event and any _cancel_inflight_tool_calls supersede)
# never carries plaintext secrets. ``status`` is left untouched, so
# the terminal-transition check below is unaffected.
if target is not None:
self._mask_tool_call_entry(target)
Comment thread
simonrosenberg marked this conversation as resolved.
# Persist exactly one terminal event per tool call. Intermediate
# progress frames each carry the *full cumulative* output; emitting
# one per frame is O(n^2) storage + WebSocket relay (the bug this
Expand Down Expand Up @@ -1404,6 +1477,14 @@ def _start_acp_server(self, state: ConversationState) -> None:
"""Start the ACP subprocess and initialize the session."""
client = _OpenHandsACPBridge()
self._client = client
# Bind the secret masker for the conversation's lifetime. It's derived
# from state.secret_registry (stable for the conversation) and is a pure
# read of _exported_values, so it has none of the cross-thread/state-lock
# hazards that make on_event/on_token strictly per-turn. Binding it here
# (rather than per-turn in _reset_client_for_turn) keeps it available for
# session updates AND for ask_agent() forks, which run on the shared
# client and may fire while no step()/astep() turn is active.
client.mask = state.secret_registry.mask_secrets_in_output

# Build the subprocess environment. Precedence, highest first:
# acp_env > state.secret_registry > agent_context.secrets
Expand Down Expand Up @@ -1717,7 +1798,8 @@ def _reset_client_for_turn(
clears them. ``on_event`` is fired from inside
``_OpenHandsACPBridge.session_update`` as tool-call notifications
arrive, so consumers see ACPToolCallEvents streamed live instead of
a single end-of-turn burst.
a single end-of-turn burst. The secret masker is bound once in
``_start_acp_server`` (conversation-stable), not here.
"""
self._client.reset()
self._client.on_token = on_token
Expand Down Expand Up @@ -2035,8 +2117,13 @@ def _finalize_successful_turn(
# matching terminal observation before the turn's FinishAction lands.
self._flush_inflight_tool_calls_as_completed()

response_text = "".join(self._client.accumulated_text)
thought_text = "".join(self._client.accumulated_thoughts)
# Re-mask the joined text at this persistence boundary: the chunks were
# already masked individually as they streamed, but a secret split
# across two chunks only reassembles in the join, so this is where it
# gets caught before landing in the persisted event stream.
mask = state.secret_registry.mask_secrets_in_output
response_text = mask("".join(self._client.accumulated_text))
Comment thread
simonrosenberg marked this conversation as resolved.
thought_text = mask("".join(self._client.accumulated_thoughts))
if not response_text:
response_text = "(No response from ACP server)"

Expand Down Expand Up @@ -2554,7 +2641,10 @@ async def _fork_and_prompt() -> str:
)
fork_elapsed = time.monotonic() - fork_t0

result = "".join(client._fork_accumulated_text)
# Re-mask the joined fork text at this return boundary — mirrors
# _finalize_successful_turn, catching a secret split across fork
# chunks that per-chunk masking can't match.
result = client._mask_value("".join(client._fork_accumulated_text))
usage_update = client.pop_turn_usage_update(fork_session_id)
self._record_usage(
response,
Expand Down
Loading
Loading