diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index 7c6b60a23e..42e82514fb 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -23,7 +23,7 @@ import threading import time import uuid -from collections.abc import Generator +from collections.abc import Callable, Generator from concurrent.futures import Future from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, NamedTuple @@ -433,6 +433,24 @@ def _image_url_to_acp_block(url: str) -> ImageContentBlock | None: return image_block(data="", mime_type="image/png", uri=url) +def _mask_json_value(value: Any, mask: Callable[[str], str]) -> Any: + """Recursively apply *mask* to every string leaf of a JSON-like value. + + ACP tool-call ``raw_input`` / ``raw_output`` / ``content`` blocks are + arbitrary JSON (a bare string, a dict of params, a list of content + blocks). ``SecretRegistry.mask_secrets_in_output`` is a pure string op, + so walk the structure and mask each leaf string; non-string leaves + (ints, bools, ``None``) pass through unchanged. + """ + if isinstance(value, str): + return mask(value) + if isinstance(value, dict): + return {k: _mask_json_value(v, mask) for k, v in value.items()} + if isinstance(value, list): + return [_mask_json_value(v, mask) for v in value] + return value + + def _serialize_tool_content(content: list[Any] | None) -> list[dict[str, Any]] | None: """Serialize ACP tool call content blocks to plain dicts for JSON storage.""" if not content: @@ -576,6 +594,13 @@ def __init__(self) -> None: # signal that the ACP subprocess is still actively working. Set by # ACPAgent.step() to keep the agent-server's idle timer alive. self.on_activity: Any = None # Callable[[], None] | None + # Secret masker — set per turn by ACPAgent to + # ``state.secret_registry.mask_secrets_in_output``. Applied to streamed + # text chunks and tool-call raw_input/raw_output/content before they + # reach ``on_token`` / ``on_event`` so a subprocess that echoes an + # injected credential never lands in the (persisted, network-relayed) + # event stream in cleartext. ``None`` ⇒ no-op (bridge used standalone). + self.mask: Callable[[str], str] | None = None self._last_activity_signal: float = float("-inf") # Telemetry state from UsageUpdate (persists across turns) self._last_cost: float = 0.0 # last cumulative cost seen @@ -619,6 +644,37 @@ def pop_turn_usage_update(self, session_id: str) -> Any: self._usage_received.pop(session_id, None) return self._turn_usage_updates.pop(session_id, None) + def _mask_value(self, value: Any) -> Any: + """Mask injected secrets in *value* (string or JSON-like), no-op if unset. + + Defensive: on mask failure, returns the original value unchanged and + logs at DEBUG — this may transiently leak the credential but prevents a + crash, matching the regular terminal tool's masking contract. (Masking + is a pure ``str.replace`` and should never raise in practice.) + """ + if self.mask is None: + return value + try: + return _mask_json_value(value, self.mask) + except Exception: + logger.debug("secret masking failed", exc_info=True) + return value + + def _mask_tool_call_entry(self, entry: dict[str, Any]) -> None: + """Mask title / raw_input / raw_output / content of a tool-call entry. + + Applied in place at ingestion (``session_update``) so the accumulator + itself never holds plaintext secrets, and every downstream emitter + (``_emit_tool_call_event`` and the supersede path in + ``_cancel_inflight_tool_calls``) carries masked values for free. + ``title`` is normally a benign server-set label, but a misbehaving ACP + server could echo a credential there (e.g. ``Running: curl -H + 'Authorization: Bearer '``), so it is masked too. + """ + for key in ("title", "raw_input", "raw_output", "content"): + if entry.get(key) is not None: + entry[key] = self._mask_value(entry[key]) + # -- Client protocol methods ------------------------------------------ async def session_update( @@ -629,16 +685,26 @@ async def session_update( ) -> None: logger.debug("ACP session_update: type=%s", type(update).__name__) - # Route fork session updates to the fork accumulator + # Route fork session updates to the fork accumulator. ask_agent() joins + # and returns this text to the caller (a UI/network sink), so mask it + # like the main-turn path — a secret echoed in a fork session must not + # leak in cleartext. if self._fork_session_id is not None and session_id == self._fork_session_id: if isinstance(update, AgentMessageChunk): if isinstance(update.content, TextContentBlock): - self._fork_accumulated_text.append(update.content.text) + self._fork_accumulated_text.append( + self._mask_value(update.content.text) + ) return if isinstance(update, AgentMessageChunk): if isinstance(update.content, TextContentBlock): - text = update.content.text + # Mask once, then use the masked chunk for both the persisted + # accumulation and the live ``on_token`` relay. A secret split + # across two chunks slips through here (each piece alone won't + # match); the joined response is re-masked at the persistence + # boundary in ``_finalize_successful_turn`` to catch that. + text = self._mask_value(update.content.text) self.accumulated_text.append(text) if self.on_token is not None: try: @@ -648,7 +714,7 @@ async def session_update( self._maybe_signal_activity() elif isinstance(update, AgentThoughtChunk): if isinstance(update.content, TextContentBlock): - self.accumulated_thoughts.append(update.content.text) + self.accumulated_thoughts.append(self._mask_value(update.content.text)) elif isinstance(update, UsageUpdate): # Store the update for step()/ask_agent() to process in one place. self._context_window = update.size @@ -667,6 +733,7 @@ async def session_update( "raw_output": update.raw_output, "content": _serialize_tool_content(update.content), } + self._mask_tool_call_entry(entry) self.accumulated_tool_calls.append(entry) logger.debug("ACP tool call start: %s", update.tool_call_id) # Emit one early "started" event — the action half of the @@ -700,6 +767,12 @@ async def session_update( target = tc break logger.debug("ACP tool call progress: %s", update.tool_call_id) + # Mask the merged entry on every frame so the accumulator (and thus + # the terminal event and any _cancel_inflight_tool_calls supersede) + # never carries plaintext secrets. ``status`` is left untouched, so + # the terminal-transition check below is unaffected. + if target is not None: + self._mask_tool_call_entry(target) # Persist exactly one terminal event per tool call. Intermediate # progress frames each carry the *full cumulative* output; emitting # one per frame is O(n^2) storage + WebSocket relay (the bug this @@ -1404,6 +1477,14 @@ def _start_acp_server(self, state: ConversationState) -> None: """Start the ACP subprocess and initialize the session.""" client = _OpenHandsACPBridge() self._client = client + # Bind the secret masker for the conversation's lifetime. It's derived + # from state.secret_registry (stable for the conversation) and is a pure + # read of _exported_values, so it has none of the cross-thread/state-lock + # hazards that make on_event/on_token strictly per-turn. Binding it here + # (rather than per-turn in _reset_client_for_turn) keeps it available for + # session updates AND for ask_agent() forks, which run on the shared + # client and may fire while no step()/astep() turn is active. + client.mask = state.secret_registry.mask_secrets_in_output # Build the subprocess environment. Precedence, highest first: # acp_env > state.secret_registry > agent_context.secrets @@ -1717,7 +1798,8 @@ def _reset_client_for_turn( clears them. ``on_event`` is fired from inside ``_OpenHandsACPBridge.session_update`` as tool-call notifications arrive, so consumers see ACPToolCallEvents streamed live instead of - a single end-of-turn burst. + a single end-of-turn burst. The secret masker is bound once in + ``_start_acp_server`` (conversation-stable), not here. """ self._client.reset() self._client.on_token = on_token @@ -2035,8 +2117,13 @@ def _finalize_successful_turn( # matching terminal observation before the turn's FinishAction lands. self._flush_inflight_tool_calls_as_completed() - response_text = "".join(self._client.accumulated_text) - thought_text = "".join(self._client.accumulated_thoughts) + # Re-mask the joined text at this persistence boundary: the chunks were + # already masked individually as they streamed, but a secret split + # across two chunks only reassembles in the join, so this is where it + # gets caught before landing in the persisted event stream. + mask = state.secret_registry.mask_secrets_in_output + response_text = mask("".join(self._client.accumulated_text)) + thought_text = mask("".join(self._client.accumulated_thoughts)) if not response_text: response_text = "(No response from ACP server)" @@ -2554,7 +2641,10 @@ async def _fork_and_prompt() -> str: ) fork_elapsed = time.monotonic() - fork_t0 - result = "".join(client._fork_accumulated_text) + # Re-mask the joined fork text at this return boundary — mirrors + # _finalize_successful_turn, catching a secret split across fork + # chunks that per-chunk masking can't match. + result = client._mask_value("".join(client._fork_accumulated_text)) usage_update = client.pop_turn_usage_update(fork_session_id) self._record_usage( response, diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index ff6c3401b3..7d0e482aaf 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -23,6 +23,7 @@ _extract_session_models, _extract_token_usage, _image_url_to_acp_block, + _mask_json_value, _maybe_set_session_model, _OpenHandsACPBridge, _reapply_session_model_on_resume, @@ -6410,3 +6411,269 @@ def test_false_for_known_unsupported_provider(self, monkeypatch): agent._session_id = "sess-1" agent._agent_name = "locked-down-provider" assert agent.supports_runtime_model_switch is False + + +# --------------------------------------------------------------------------- +# Secret masking (#1023) +# --------------------------------------------------------------------------- + + +def _redacting_mask(text: str) -> str: + """Stand-in for ``secret_registry.mask_secrets_in_output``: replaces the + literal secret with the same sentinel the real registry uses.""" + return text.replace("SEKRET", "") + + +class TestMaskJsonValue: + """Unit tests for the recursive JSON masker helper.""" + + def test_masks_bare_string(self): + assert _mask_json_value("token=SEKRET", _redacting_mask) == ( + "token=" + ) + + def test_masks_nested_dict_and_list(self): + value = { + "command": "curl -H 'Authorization: Bearer SEKRET'", + "args": ["--data", "key=SEKRET"], + "count": 3, + "ok": True, + "nothing": None, + } + masked = _mask_json_value(value, _redacting_mask) + assert masked["command"] == "curl -H 'Authorization: Bearer '" + assert masked["args"] == ["--data", "key="] + # Non-string leaves pass through unchanged. + assert masked["count"] == 3 + assert masked["ok"] is True + assert masked["nothing"] is None + + def test_non_string_scalar_passthrough(self): + assert _mask_json_value(42, _redacting_mask) == 42 + assert _mask_json_value(None, _redacting_mask) is None + + +class TestACPBridgeMasking: + """``_OpenHandsACPBridge`` masks injected secrets before they reach the + ``on_token`` / ``on_event`` sinks (persisted + network-relayed).""" + + @pytest.mark.asyncio + async def test_message_chunk_masked_in_relay_and_accumulation(self): + from acp.schema import AgentMessageChunk, TextContentBlock + + client = _OpenHandsACPBridge() + client.mask = _redacting_mask + tokens: list[str] = [] + client.on_token = tokens.append + + chunk = MagicMock(spec=AgentMessageChunk) + chunk.content = MagicMock(spec=TextContentBlock) + chunk.content.text = "the token is SEKRET" + + await client.session_update("sess-1", chunk) + + assert tokens == ["the token is "] + assert client.accumulated_text == ["the token is "] + + @pytest.mark.asyncio + async def test_thought_chunk_masked(self): + from acp.schema import AgentThoughtChunk, TextContentBlock + + client = _OpenHandsACPBridge() + client.mask = _redacting_mask + + chunk = MagicMock(spec=AgentThoughtChunk) + chunk.content = MagicMock(spec=TextContentBlock) + chunk.content.text = "I will use SEKRET" + + await client.session_update("sess-1", chunk) + + assert client.accumulated_thoughts == ["I will use "] + + @pytest.mark.asyncio + async def test_tool_call_start_masks_raw_fields(self): + from acp.schema import ToolCallStart + + client = _OpenHandsACPBridge() + client.mask = _redacting_mask + events: list = [] + client.on_event = events.append + + start = MagicMock(spec=ToolCallStart) + start.tool_call_id = "tc-1" + start.title = "Running: echo SEKRET" + start.kind = "execute" + start.status = "in_progress" + start.raw_input = {"command": "echo SEKRET"} + start.raw_output = "leaked SEKRET here" + start.content = None + + await client.session_update("sess-1", start) + + assert len(events) == 1 + evt = events[0] + assert evt.title == "Running: echo " + assert evt.raw_input == {"command": "echo "} + assert evt.raw_output == "leaked here" + # The accumulator itself must hold masked values so the supersede / + # flush path can't re-leak them. + stored = client.accumulated_tool_calls[0] + assert stored["title"] == "Running: echo " + assert stored["raw_input"] == {"command": "echo "} + assert stored["raw_output"] == "leaked here" + + @pytest.mark.asyncio + async def test_tool_call_progress_masks_terminal_output(self): + from acp.schema import ToolCallProgress, ToolCallStart + + client = _OpenHandsACPBridge() + client.mask = _redacting_mask + events: list = [] + client.on_event = events.append + + start = MagicMock(spec=ToolCallStart) + start.tool_call_id = "tc-1" + start.title = "Run" + start.kind = "execute" + start.status = "in_progress" + start.raw_input = None + start.raw_output = None + start.content = None + await client.session_update("sess-1", start) + + # Terminal progress frame carries the secret in its cumulative output. + progress = MagicMock(spec=ToolCallProgress) + progress.tool_call_id = "tc-1" + progress.title = None + progress.kind = None + progress.status = "completed" + progress.raw_input = None + progress.raw_output = "result: SEKRET" + progress.content = None + await client.session_update("sess-1", progress) + + # The terminal event (emitted on the in_progress->completed transition) + # carries masked output. + assert events[-1].status == "completed" + assert events[-1].raw_output == "result: " + assert ( + client.accumulated_tool_calls[0]["raw_output"] == "result: " + ) + + @pytest.mark.asyncio + async def test_no_masking_when_mask_unset(self): + """A standalone bridge (mask is None) passes text through unchanged + and never raises.""" + from acp.schema import AgentMessageChunk, TextContentBlock + + client = _OpenHandsACPBridge() + assert client.mask is None + tokens: list[str] = [] + client.on_token = tokens.append + + chunk = MagicMock(spec=AgentMessageChunk) + chunk.content = MagicMock(spec=TextContentBlock) + chunk.content.text = "raw SEKRET" + + await client.session_update("sess-1", chunk) + + assert tokens == ["raw SEKRET"] + assert client.accumulated_text == ["raw SEKRET"] + + def test_mask_value_swallows_mask_errors(self): + """A failing masker must never crash session_update — fall back to the + original value (matches the regular terminal tool's masking contract).""" + + def _boom(_text: str) -> str: + raise RuntimeError("masker exploded") + + client = _OpenHandsACPBridge() + client.mask = _boom + assert client._mask_value("keep SEKRET") == "keep SEKRET" + + def test_reset_preserves_mask(self): + """mask is conversation-lifetime (bound once in _start_acp_server), so a + per-turn reset() must NOT clear it — unlike on_token/on_event.""" + client = _OpenHandsACPBridge() + client.mask = _redacting_mask + client.reset() + assert client.mask is _redacting_mask + + @pytest.mark.asyncio + async def test_fork_session_text_masked(self): + """ask_agent() joins _fork_accumulated_text and returns it to the + caller, so fork-session chunks must be masked too.""" + from acp.schema import AgentMessageChunk, TextContentBlock + + client = _OpenHandsACPBridge() + client.mask = _redacting_mask + client._fork_session_id = "fork-1" + + chunk = MagicMock(spec=AgentMessageChunk) + chunk.content = MagicMock(spec=TextContentBlock) + chunk.content.text = "fork says SEKRET" + + await client.session_update("fork-1", chunk) + + assert client._fork_accumulated_text == ["fork says "] + + +class TestACPStepMasksPersistedTurn: + """End-to-end: the persisted FinishAction text is masked at the join + boundary, including secrets split across streamed chunks.""" + + def _make_conversation_with_message(self, tmp_path, text="Hello"): + state = _make_state(tmp_path) + state.events.append( + SystemPromptEvent( + source="agent", + system_prompt=TextContent(text="ACP-managed agent"), + tools=[], + ) + ) + state.events.append( + MessageEvent( + source="user", + llm_message=Message(role="user", content=[TextContent(text=text)]), + ) + ) + conversation = MagicMock() + conversation.state = state + return conversation + + def test_finish_action_masks_secret_split_across_chunks(self, tmp_path): + agent = _make_agent() + conversation = self._make_conversation_with_message(tmp_path) + # Seed the mask set via the canonical registry path (get_secret_value + # records the resolved value in _exported_values) — the same path + # _start_acp_server drives for StartConversationRequest secrets. + reg = conversation.state.secret_registry + reg.update_secrets({"TOKEN": "supersecret"}) + reg.get_secret_value("TOKEN") + events: list = [] + + mock_client = _OpenHandsACPBridge() + agent._client = mock_client + agent._conn = MagicMock() + agent._session_id = "test-session" + + def _fake_run_async(_coro, **_kwargs): + # Populate accumulated_text directly, bypassing session_update (and + # thus per-chunk masking) on purpose: this isolates the join-boundary + # re-mask in _finalize_successful_turn. The secret straddles two + # chunks, so neither chunk matches alone — only the reassembled join + # does, which is exactly what the persistence-boundary mask catches. + mock_client.accumulated_text.append("the value is super") + mock_client.accumulated_text.append("secret now") + + mock_executor = MagicMock() + mock_executor.run_async = _fake_run_async + agent._executor = mock_executor + + agent.step(conversation, on_event=events.append) + + finish = next( + e for e in events if isinstance(getattr(e, "action", None), FinishAction) + ) + assert "supersecret" not in finish.action.message + assert finish.action.message == "the value is now"