From c32f5dc99430ef9eae52b1ceb490a4e54e3e5bf5 Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Wed, 17 Jun 2026 10:24:14 +0200 Subject: [PATCH 1/9] feat: add /goal SDK core (judge-driven goal-completion loop) Add openhands.sdk.conversation.goal: a conversation-level "/goal" driver that pursues an objective by running the agent, judging completion with a second LLM, and re-prompting until the goal is done or a cap is reached. - judge_goal + GoalVerdict: the reusable objective+transcript -> verdict kernel (renders the transcript, excluding the system prompt, and asks a judge LLM for a strict-JSON verdict). - GoalController: transport-agnostic continue-vs-stop decision logic and the iteration cap. - run_goal: a thin synchronous driver over the controller that composes with any existing critic (the critic governs each inner run(); this loop governs the overall objective). Self-contained, with no agent-server dependency. Includes a runnable demo under .pr/ proving the goal work lands in the same conversation history. Relates to #3569. --- .pr/README.md | 64 +++++++ .pr/goal_shared_history.py | 166 ++++++++++++++++++ .../sdk/conversation/goal/__init__.py | 42 +++++ .../sdk/conversation/goal/controller.py | 132 ++++++++++++++ .../openhands/sdk/conversation/goal/judge.py | 123 +++++++++++++ .../sdk/conversation/goal/prompts.py | 43 +++++ .../openhands/sdk/conversation/goal/runner.py | 57 ++++++ tests/sdk/conversation/goal/__init__.py | 0 .../conversation/goal/fixtures/events.jsonl | 10 ++ .../goal/fixtures/expected_transcript.txt | 62 +++++++ .../sdk/conversation/goal/test_controller.py | 71 ++++++++ tests/sdk/conversation/goal/test_judge.py | 95 ++++++++++ .../goal/test_render_transcript.py | 77 ++++++++ tests/sdk/conversation/goal/test_runner.py | 65 +++++++ 14 files changed, 1007 insertions(+) create mode 100644 .pr/README.md create mode 100644 .pr/goal_shared_history.py create mode 100644 openhands-sdk/openhands/sdk/conversation/goal/__init__.py create mode 100644 openhands-sdk/openhands/sdk/conversation/goal/controller.py create mode 100644 openhands-sdk/openhands/sdk/conversation/goal/judge.py create mode 100644 openhands-sdk/openhands/sdk/conversation/goal/prompts.py create mode 100644 openhands-sdk/openhands/sdk/conversation/goal/runner.py create mode 100644 tests/sdk/conversation/goal/__init__.py create mode 100644 tests/sdk/conversation/goal/fixtures/events.jsonl create mode 100644 tests/sdk/conversation/goal/fixtures/expected_transcript.txt create mode 100644 tests/sdk/conversation/goal/test_controller.py create mode 100644 tests/sdk/conversation/goal/test_judge.py create mode 100644 tests/sdk/conversation/goal/test_render_transcript.py create mode 100644 tests/sdk/conversation/goal/test_runner.py diff --git a/.pr/README.md b/.pr/README.md new file mode 100644 index 0000000000..6bc2ecf9ff --- /dev/null +++ b/.pr/README.md @@ -0,0 +1,64 @@ +# `/goal` shared-history demo + +Proves that the `/goal` loop writes into the **same** conversation history as the +main chat — it drives the `Conversation` you pass in, it does **not** fork or +create a sidecar conversation. + +## Run + +```bash +# Deterministic, no network (scripted TestLLMs) — always works: +uv run python .pr/goal_shared_history.py + +# Real agent doing real work (creates files, runs pytest) — opt in explicitly: +GOAL_DEMO_REAL=1 LLM_API_KEY=sk-... LLM_MODEL=gpt-5.5 \ + uv run python .pr/goal_shared_history.py +``` + +## What to look for + +The script sends a normal "main conversation" message, then runs `run_goal(...)` +on the **same** `Conversation`. The `PROOF` section at the end shows: + +``` +same conversation id .............. True +only one Conversation object ...... True (no fork was created) +event log GREW in place ........... 3 -> 7 +main-convo events still present ... True +goal objective is in THIS log ..... True +goal outcome ...................... complete (after 2 round(s)) +``` + +i.e. the goal's objective, the agent's work, the judge-driven followups, and the +completion are all appended to the **one** `conversation.state.events` log under +the **one** `conversation.id` — alongside (not replacing) the main-convo events. + +## Seeing what the LLM is doing + +The demo passes `visualizer=None` to keep the proof output clean. To watch the +agent's activity: + +- **Live**: drop `visualizer=None` (the default is `DefaultConversationVisualizer`), + and every event — messages, tool calls, observations — prints as it happens. +- **After the fact**: the script ends with a `REPLAY` section that renders the + saved history through the visualizer. Because every turn is persisted in + `conversation.state.events`, you can replay it any time: + + ```python + from openhands.sdk.conversation.visualizer import DefaultConversationVisualizer + viz = DefaultConversationVisualizer() + for event in conversation.state.events: + viz.on_event(event) + ``` + +In the deterministic (no-key) run the agent only emits scripted text, so you see +messages. In real mode (`GOAL_DEMO_REAL=1`) you also see the actual terminal +commands, file edits, and `pytest` output the agent runs. + +## How this maps to the agent server + +`run_goal` (used here) and the agent server's `EventService.start_goal` use the +same mechanism: they drive a single `Conversation`/`_conversation`, so every +event lands in that conversation's shared log and streams to subscribers. A +`POST /conversations/{id}/goal` endpoint runs the loop in the background on the +**existing** conversation — same history as the main chat. diff --git a/.pr/goal_shared_history.py b/.pr/goal_shared_history.py new file mode 100644 index 0000000000..9dc6495847 --- /dev/null +++ b/.pr/goal_shared_history.py @@ -0,0 +1,166 @@ +"""Runnable proof that the ``/goal`` loop writes into the SAME conversation history. + +What it does: + 1. Sends a normal "main conversation" message and runs the agent. + 2. Runs a ``/goal`` loop on the *same* ``Conversation`` object. + 3. Prints the single shared event log and checks that the main-conversation + events are still there, untouched, with the goal's objective / agent work / + judge-driven followups / completion appended after them. + +The point: ``run_goal`` drives the conversation you pass in (it does not fork or +spin up a sidecar), so everything lands in one ``conversation.state.events`` log +under one ``conversation.id``. The agent-server ``EventService.start_goal`` uses +the same mechanism on its single ``_conversation``, so this proves the property +both paths rely on. + +Run it two ways: + # Deterministic, no network (scripted TestLLMs) -- always works, quick check: + uv run python .pr/goal_shared_history.py + + # Real agent doing real work (creates files, runs pytest) -- opt in explicitly: + GOAL_DEMO_REAL=1 LLM_API_KEY=sk-... LLM_MODEL=gpt-5.5 \ + uv run python .pr/goal_shared_history.py +""" + +import os +import tempfile + +from openhands.sdk import LLM, Agent, Conversation, Tool +from openhands.sdk.conversation.goal import run_goal +from openhands.sdk.conversation.visualizer import DefaultConversationVisualizer +from openhands.sdk.event import LLMConvertibleEvent +from openhands.sdk.llm import Message, TextContent, content_to_str +from openhands.sdk.testing import TestLLM +from openhands.tools.file_editor import FileEditorTool +from openhands.tools.terminal import TerminalTool + + +def dump_history(conversation, title: str) -> list: + """Print the conversation's full event log and return its events.""" + events = list(conversation.state.events) + print(f"\n===== {title} =====") + print(f"conversation id : {conversation.id}") + print(f"total events : {len(events)}") + for i, ev in enumerate(events): + if isinstance(ev, LLMConvertibleEvent): + text = " ".join(content_to_str(ev.to_llm_message().content)) + text = text.strip().replace("\n", " ") + print(f" [{i:>2}] {ev.to_llm_message().role:<9} {text[:96]}") + else: + print(f" [{i:>2}] {type(ev).__name__}") + return events + + +def _scripted(*texts: str, usage_id: str) -> TestLLM: + return TestLLM.from_messages( + [Message(role="assistant", content=[TextContent(text=t)]) for t in texts], + usage_id=usage_id, + ) + + +def build(real: bool): + """Return (agent, judge_llm, main_message, objective, max_iterations).""" + if real: + llm = LLM( + usage_id="agent", + model=os.getenv("LLM_MODEL", "gpt-5.5"), + api_key=os.getenv("LLM_API_KEY"), + base_url=os.getenv("LLM_BASE_URL"), + ) + agent = Agent( + llm=llm, + tools=[Tool(name=TerminalTool.name), Tool(name=FileEditorTool.name)], + ) + judge_llm = llm.model_copy(update={"usage_id": "goal-judge"}) + objective = ( + "Create mathx.py with an add(a, b) function and test_mathx.py with a " + "pytest test for it. The goal is complete only when `python -m pytest " + "-q` passes. Finish each turn with the finish tool." + ) + return ( + agent, + judge_llm, + "Say hello and tell me which directory you are in.", + objective, + 5, + ) + + # Deterministic path: scripted agent (one content-only reply per run) + a + # judge that says "not done" once, then "done". + agent = Agent( + llm=_scripted( + "Hello! I am working in the demo workspace.", # main turn + "I drafted mathx.py and a pytest for it.", # goal round 1 + "Fixed it -- mathx.py and test_mathx.py now pass.", # goal round 2 + usage_id="agent", + ), + tools=[], + ) + judge_llm = _scripted( + '{"score": 0.3, "complete": false, "missing": "tests not passing yet"}', + '{"score": 1.0, "complete": true, "missing": ""}', + usage_id="goal-judge", + ) + return agent, judge_llm, "Say hello.", "Make `pytest` pass for mathx.py.", 5 + + +def main() -> None: + # Real mode is explicit opt-in so the deterministic demo always works, + # even when a (possibly stale) LLM_API_KEY is present in the environment. + real = os.getenv("GOAL_DEMO_REAL") == "1" + print(f"mode: {'REAL LLM' if real else 'DETERMINISTIC (scripted TestLLM)'}") + + agent, judge_llm, main_message, objective, max_iters = build(real) + workspace = tempfile.mkdtemp(prefix="goal-demo-") + # visualizer=None keeps the output focused on the proof below. + from pathlib import Path + + conversation = Conversation( + agent=agent, workspace=workspace, visualizer=None, persistence_dir=Path.cwd() + ) + convo_id = conversation.id + + # 1) A normal "main conversation" turn. + conversation.send_message(main_message) + conversation.run() + main_events = dump_history(conversation, "AFTER MAIN CONVERSATION TURN") + main_ids = [ev.id for ev in main_events] + + # 2) A /goal loop on the SAME conversation object. + print(f"\n>>> running /goal: {objective}\n") + outcome = run_goal(conversation, objective, judge_llm, max_iterations=max_iters) + + all_events = dump_history(conversation, "AFTER /goal LOOP (SAME CONVERSATION)") + all_ids = [ev.id for ev in all_events] + + # 3) Prove it is one shared history. + objective_in_log = any( + objective[:20] in " ".join(content_to_str(ev.to_llm_message().content)) + for ev in all_events + if isinstance(ev, LLMConvertibleEvent) + ) + print("\n===== PROOF (shared history) =====") + print(f"same conversation id .............. {conversation.id == convo_id}") + print("only one Conversation object ...... True (no fork was created)") + print(f"event log GREW in place ........... {len(main_ids)} -> {len(all_ids)}") + print(f"main-convo events still present ... {all_ids[: len(main_ids)] == main_ids}") + print(f"goal objective is in THIS log ..... {objective_in_log}") + print( + f"goal outcome ...................... {outcome.status} " + f"(after {outcome.iterations} round(s))" + ) + print(f"\nworkspace: {workspace}") + + # Visualize the whole thing AFTER the fact. Because every turn (main + goal) + # is persisted in conversation.state.events, we can replay the conversation + # through the SDK's visualizer at any time -- here, after the run finished. + # (For LIVE output instead, drop `visualizer=None` above; the default + # DefaultConversationVisualizer then prints each event as it happens.) + print("\n===== REPLAY (visualizing the saved conversation) =====") + visualizer = DefaultConversationVisualizer() + for event in conversation.state.events: + visualizer.on_event(event) + + +if __name__ == "__main__": + main() diff --git a/openhands-sdk/openhands/sdk/conversation/goal/__init__.py b/openhands-sdk/openhands/sdk/conversation/goal/__init__.py new file mode 100644 index 0000000000..766318e385 --- /dev/null +++ b/openhands-sdk/openhands/sdk/conversation/goal/__init__.py @@ -0,0 +1,42 @@ +"""The ``/goal`` command: judge-driven, self-continuing goal completion. + +A conversation-level command (not a critic) that drives the agent toward an +objective: it sends the objective, runs the agent, judges completion with a +second LLM, and re-prompts until the goal is done or a cap is reached. + +The decision logic lives in :class:`GoalController` (transport-agnostic, no +I/O); :func:`run_goal` is a thin synchronous driver over it. An async +agent-server task can reuse the same controller with its own I/O loop. + +Usage:: + + from openhands.sdk.conversation.goal import run_goal + + outcome = run_goal(conversation, "make pytest pass for mathx.py", judge_llm) +""" + +from openhands.sdk.conversation.goal.controller import ( + GoalContinue, + GoalController, + GoalDone, + GoalOutcome, + GoalStatus, + GoalStatusName, + GoalStep, +) +from openhands.sdk.conversation.goal.judge import GoalVerdict, judge_goal +from openhands.sdk.conversation.goal.runner import run_goal + + +__all__ = [ + "GoalContinue", + "GoalController", + "GoalDone", + "GoalOutcome", + "GoalStatus", + "GoalStatusName", + "GoalStep", + "GoalVerdict", + "judge_goal", + "run_goal", +] diff --git a/openhands-sdk/openhands/sdk/conversation/goal/controller.py b/openhands-sdk/openhands/sdk/conversation/goal/controller.py new file mode 100644 index 0000000000..c2bae7140b --- /dev/null +++ b/openhands-sdk/openhands/sdk/conversation/goal/controller.py @@ -0,0 +1,132 @@ +"""The transport-agnostic brain of the ``/goal`` loop. + +``GoalController`` decides -- after each agent run finishes -- whether to +continue (with a followup message) or stop (with a ``GoalOutcome``). It performs +NO I/O: a *driver* (the sync ``run_goal``, or an async agent-server task) owns +sending messages and running the agent; the controller only judges and decides. +That split lets the sync and async drivers share identical decision logic. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Literal + +from pydantic import BaseModel, Field + +from openhands.sdk.conversation.goal.judge import GoalVerdict, judge_goal +from openhands.sdk.conversation.goal.prompts import FOLLOWUP_PROMPT +from openhands.sdk.event import Event +from openhands.sdk.llm import LLM +from openhands.sdk.logger import get_logger + + +logger = get_logger(__name__) + + +class GoalOutcome(BaseModel): + """Result of a ``/goal`` loop. + + ``status`` distinguishes genuine completion from hitting the iteration cap, + so a driver never has to guess whether a silent finish meant success. + """ + + status: Literal["complete", "capped"] + iterations: int = Field(ge=1, description="Number of audit rounds performed.") + verdict: GoalVerdict + + +GoalStatusName = Literal["running", "complete", "capped", "interrupted"] +"""Lifecycle state of a ``/goal`` loop.""" + + +class GoalStatus(BaseModel): + """Live status of a ``/goal`` loop, for a UI progress chip. + + The agent server publishes this as the ``value`` of a + ``ConversationStateUpdateEvent`` with ``key="goal"`` at each lifecycle point + (start, each round, and the terminal/interrupted state). + """ + + active: bool = Field(description="Whether the goal loop is still running.") + status: GoalStatusName + iteration: int = Field(ge=0, description="Audit rounds completed so far.") + max_iterations: int = Field(ge=1) + objective: str + verdict: GoalVerdict | None = Field( + default=None, description="Last judge verdict; set once the loop ends." + ) + + +class GoalContinue(BaseModel): + """Decision to keep going: send ``followup`` before the next run.""" + + followup: str + + +class GoalDone(BaseModel): + """Decision to stop: the loop finished with ``outcome``.""" + + outcome: GoalOutcome + + +GoalStep = GoalContinue | GoalDone +"""One decision returned by :meth:`GoalController.on_run_finished`.""" + + +class GoalController: + """Judges goal completion and decides continue-vs-stop, without doing I/O. + + A driver calls :meth:`start` once to get the first message to send, then + calls :meth:`on_run_finished` after every agent run to get the next + decision. The controller owns the iteration count and the ``max_iterations`` + cap, so drivers stay trivial. + """ + + def __init__( + self, objective: str, judge_llm: LLM, *, max_iterations: int = 10 + ) -> None: + if not objective.strip(): + raise ValueError("Goal objective must not be empty.") + if max_iterations < 1: + raise ValueError("max_iterations must be >= 1.") + self.objective = objective + self.judge_llm = judge_llm + self.max_iterations = max_iterations + self.iteration = 0 + + def start(self) -> str: + """Return the first message a driver should send (the objective).""" + return self.objective + + def on_run_finished(self, events: Sequence[Event]) -> GoalStep: + """Judge the objective after a run and decide whether to continue. + + Increments the iteration count, audits ``events`` with the judge LLM, + and returns a :class:`GoalContinue` (with a followup) or a terminal + :class:`GoalDone` (with a :class:`GoalOutcome`). + """ + self.iteration += 1 + verdict = judge_goal(self.judge_llm, self.objective, events) + logger.info( + "Goal audit %d/%d: score=%.2f complete=%s", + self.iteration, + self.max_iterations, + verdict.score, + verdict.complete, + ) + if verdict.complete: + return GoalDone( + outcome=GoalOutcome( + status="complete", iterations=self.iteration, verdict=verdict + ) + ) + if self.iteration >= self.max_iterations: + return GoalDone( + outcome=GoalOutcome( + status="capped", iterations=self.iteration, verdict=verdict + ) + ) + missing = verdict.missing or "Some requirements are not yet verified." + followup = FOLLOWUP_PROMPT.format(iteration=self.iteration, missing=missing) + return GoalContinue(followup=followup) diff --git a/openhands-sdk/openhands/sdk/conversation/goal/judge.py b/openhands-sdk/openhands/sdk/conversation/goal/judge.py new file mode 100644 index 0000000000..82b34d1a05 --- /dev/null +++ b/openhands-sdk/openhands/sdk/conversation/goal/judge.py @@ -0,0 +1,123 @@ +"""LLM judge that decides whether a ``/goal`` objective is complete. + +This is the reusable kernel of the goal feature: a pure +``objective + transcript -> verdict`` evaluator with no dependency on the +critic machinery. The ``/goal`` runner uses it to drive continuation, but it +can equally back a status command, a stop hook, or a server endpoint. +""" + +from __future__ import annotations + +import contextlib +import json +import re +from collections.abc import Sequence +from typing import Any + +from pydantic import BaseModel, Field + +from openhands.sdk.conversation.goal.prompts import JUDGE_PROMPT +from openhands.sdk.event import Event, LLMConvertibleEvent +from openhands.sdk.llm import LLM, Message, TextContent, content_to_str +from openhands.sdk.logger import get_logger + + +logger = get_logger(__name__) + + +class GoalVerdict(BaseModel): + """The judge's verdict on whether the objective is complete.""" + + score: float = Field( + ge=0.0, + le=1.0, + description="Probability (0-1) that the full objective is provably done.", + ) + complete: bool = Field( + description="Whether the judge considers the objective complete." + ) + missing: str = Field( + default="", + description="Concise description of what remains, or empty if complete.", + ) + + +def judge_goal(judge_llm: LLM, objective: str, events: Sequence[Event]) -> GoalVerdict: + """Audit the transcript and decide whether ``objective`` is complete. + + Args: + judge_llm: The second LLM that grades completion. + objective: The goal to audit against. + events: Conversation events (non-LLM events are ignored). + + Returns: + A GoalVerdict. On a judge response that cannot be parsed, returns a + conservative low score so the caller keeps working rather than + falsely finishing. + """ + convertible = [e for e in events if isinstance(e, LLMConvertibleEvent)] + transcript = _render_transcript(convertible) + prompt = JUDGE_PROMPT.format(objective=objective, transcript=transcript) + + # The judge only needs the verdict text. Force non-streaming so reusing a + # streaming agent LLM as the judge does not trip completion()'s requirement + # of an on_token callback when stream=True. + if judge_llm.stream: + judge_llm = judge_llm.model_copy(update={"stream": False}) + response = judge_llm.completion( + messages=[Message(role="user", content=[TextContent(text=prompt)])] + ) + verdict = _parse_verdict(response.message) + logger.debug("judge_goal verdict: %s", verdict) + return verdict + + +def _render_transcript(events: Sequence[LLMConvertibleEvent]) -> str: + """Render events as a plain ``role: text`` transcript for the judge. + + The agent's ``system`` prompt is excluded: it is large (~thousands of tokens) + and carries no goal-specific evidence, so it would only inflate the judge's + token cost on every audit. + """ + turns = [ + (msg.role, text) + for msg in LLMConvertibleEvent.events_to_messages(list(events)) + if msg.role != "system" + and (text := "\n".join(content_to_str(msg.content)).strip()) + ] + return "\n\n".join(f"{role}: {text}" for role, text in turns) + + +def _parse_verdict(message: Message) -> GoalVerdict: + """Normalize the judge response into a GoalVerdict, conservatively.""" + raw = "\n".join(content_to_str(message.content)).strip() + + data: dict[str, Any] | None = None + candidates = [raw] + block = re.search(r"\{.*\}", raw, re.DOTALL) + if block: + candidates.append(block.group(0)) + for candidate in candidates: + with contextlib.suppress(json.JSONDecodeError): + parsed = json.loads(candidate) + if isinstance(parsed, dict): + data = parsed + break + + if data is None: + logger.warning("judge_goal: could not parse verdict: %r", raw) + return GoalVerdict( + score=0.0, complete=False, missing="Judge verdict could not be parsed." + ) + + try: + score = float(data.get("score", 0.0)) + except (TypeError, ValueError): + score = 0.0 + score = max(0.0, min(1.0, score)) + + return GoalVerdict( + score=score, + complete=bool(data.get("complete", score >= 1.0)), + missing=str(data.get("missing") or ""), + ) diff --git a/openhands-sdk/openhands/sdk/conversation/goal/prompts.py b/openhands-sdk/openhands/sdk/conversation/goal/prompts.py new file mode 100644 index 0000000000..7973ce8a6a --- /dev/null +++ b/openhands-sdk/openhands/sdk/conversation/goal/prompts.py @@ -0,0 +1,43 @@ +"""Prompt text for the ``/goal`` command's judge and continuation messages.""" + +from __future__ import annotations + +from typing import Final + + +JUDGE_PROMPT: Final[str] = """You are auditing whether a long-running GOAL has \ +been COMPLETED by an AI software agent. + + +{objective} + + +Derive the concrete requirements implied by the objective. For EACH requirement, +look for authoritative evidence in the transcript below: file contents, command +output, or test results produced by the agent. Treat missing, uncertain, or +merely-claimed-but-unverified evidence as NOT satisfied. + + +{transcript} + + +Respond with STRICT JSON and nothing else, in exactly this shape: +{{"score": , \ +"complete": , "missing": ""}}""" + + +FOLLOWUP_PROMPT: Final[str] = """The goal is NOT yet complete (audit iteration \ +{iteration}). +Outstanding: {missing} + +Inspect the real current state of the workspace (do not rely on memory). For \ +each remaining requirement, make concrete progress and gather authoritative \ +evidence by running the relevant tests/commands. Keep the full objective intact \ +and finish only once every requirement is provably satisfied.""" + + +RESUME_PROMPT: Final[str] = """Resuming a goal that was paused or interrupted. \ +Re-check the real current state of the workspace (do not rely on memory) and \ +continue making concrete, verified progress toward the original objective. \ +Finish only once every requirement is provably satisfied.""" diff --git a/openhands-sdk/openhands/sdk/conversation/goal/runner.py b/openhands-sdk/openhands/sdk/conversation/goal/runner.py new file mode 100644 index 0000000000..253966a47c --- /dev/null +++ b/openhands-sdk/openhands/sdk/conversation/goal/runner.py @@ -0,0 +1,57 @@ +"""The synchronous ``/goal`` driver. + +``run_goal`` is a thin synchronous driver over :class:`GoalController`: it owns +the I/O (sending messages, running the agent) while the controller owns the +judging and continue-vs-stop decision. An async agent-server task can reuse the +same controller with its own I/O loop. + +Unlike a critic (which the run loop consults *inside* one ``run()``), this drives +the conversation from the outside, so it composes with whatever critic the agent +already has -- that critic governs each inner ``run()``; this loop governs the +overall objective. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from openhands.sdk.conversation.goal.controller import ( + GoalController, + GoalDone, + GoalOutcome, +) + + +if TYPE_CHECKING: + from openhands.sdk.conversation.base import BaseConversation + from openhands.sdk.llm import LLM + + +def run_goal( + conversation: BaseConversation, + objective: str, + judge_llm: LLM, + *, + max_iterations: int = 10, +) -> GoalOutcome: + """Drive ``conversation`` toward ``objective``, judging completion each round. + + Sends the objective, runs the agent to a finish, and lets a + :class:`GoalController` decide whether to re-prompt with the judge's feedback + or stop. Returns a :class:`GoalOutcome` whose ``status`` is ``"complete"`` or + ``"capped"``. + + Args: + conversation: The conversation to drive (any agent/critic config). + objective: The goal to pursue and audit against. + judge_llm: The second LLM that grades completion. + max_iterations: Hard cap on audit rounds before giving up. + """ + controller = GoalController(objective, judge_llm, max_iterations=max_iterations) + conversation.send_message(controller.start()) + while True: + conversation.run() + step = controller.on_run_finished(conversation.state.events) + if isinstance(step, GoalDone): + return step.outcome + conversation.send_message(step.followup) diff --git a/tests/sdk/conversation/goal/__init__.py b/tests/sdk/conversation/goal/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/sdk/conversation/goal/fixtures/events.jsonl b/tests/sdk/conversation/goal/fixtures/events.jsonl new file mode 100644 index 0000000000..b1e966f65f --- /dev/null +++ b/tests/sdk/conversation/goal/fixtures/events.jsonl @@ -0,0 +1,10 @@ +{"id":"03d5b5c0-6890-4b43-831a-5b079d0e889e","timestamp":"2026-06-16T05:39:18.809928","source":"environment","key":"execution_status","value":"running","kind":"ConversationStateUpdateEvent"} +{"id":"bb62c862-0a3c-42c6-b96a-7942974f63a0","timestamp":"2026-06-16T05:39:40.828199","source":"environment","key":"stats","value":{"usage_to_metrics":{"default":{"model_name":"litellm_proxy/minimax-m2.7","accumulated_cost":0.0,"max_budget_per_task":null,"accumulated_token_usage":{"model":"litellm_proxy/minimax-m2.7","prompt_tokens":596,"completion_tokens":775,"cache_read_tokens":0,"cache_write_tokens":0,"reasoning_tokens":0,"context_window":0,"per_turn_token":1371,"response_id":""}},"condenser":{"model_name":"litellm_proxy/minimax-m2.7","accumulated_cost":0.0,"max_budget_per_task":null,"accumulated_token_usage":{"model":"litellm_proxy/minimax-m2.7","prompt_tokens":0,"completion_tokens":0,"cache_read_tokens":0,"cache_write_tokens":0,"reasoning_tokens":0,"context_window":0,"per_turn_token":0,"response_id":""}}}},"kind":"ConversationStateUpdateEvent"} +{"id":"732ad0ee-1cfd-4c74-8934-ea310c76e335","timestamp":"2026-06-16T05:39:48.700734","source":"environment","key":"stats","value":{"usage_to_metrics":{"default":{"model_name":"litellm_proxy/minimax-m2.7","accumulated_cost":0.0,"max_budget_per_task":null,"accumulated_token_usage":{"model":"litellm_proxy/minimax-m2.7","prompt_tokens":35895,"completion_tokens":960,"cache_read_tokens":1115,"cache_write_tokens":0,"reasoning_tokens":0,"context_window":0,"per_turn_token":35484,"response_id":""}},"condenser":{"model_name":"litellm_proxy/minimax-m2.7","accumulated_cost":0.0,"max_budget_per_task":null,"accumulated_token_usage":{"model":"litellm_proxy/minimax-m2.7","prompt_tokens":0,"completion_tokens":0,"cache_read_tokens":0,"cache_write_tokens":0,"reasoning_tokens":0,"context_window":0,"per_turn_token":0,"response_id":""}}}},"kind":"ConversationStateUpdateEvent"} +{"id":"e54cbba2-3be2-4494-aa13-83af6044c5a7","timestamp":"2026-06-16T05:39:48.701247","source":"agent","thought":[{"cache_prompt":false,"type":"text","text":"Let me analyze the task. I need to:\n\n1. Build the session URL\n2. Extract PR info from the event payload\n3. Post a progress comment and save the comment ID\n4. Review the PR using the code-review and github-pr-review skills\n5. Update the progress comment when done\n\nLet me start by executing Steps 1 and 2.\n\n\n\nI'll help you review PR #3745 from the OpenHands/software-agent-sdk repository. Let me start by following the steps in order.\n"}],"reasoning_content":"Let me analyze the task. I need to:\n\n1. Build the session URL\n2. Extract PR info from the event payload\n3. Post a progress comment and save the comment ID\n4. Review the PR using the code-review and github-pr-review skills\n5. Update the progress comment when done\n\nLet me start by executing Steps 1 and 2.\n","thinking_blocks":[],"responses_reasoning_item":null,"action":{"command":"SESSION_URL=\"${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}\"\necho \"SESSION_URL=${SESSION_URL}\"","is_input":false,"timeout":null,"reset":false,"kind":"TerminalAction"},"tool_name":"terminal","tool_call_id":"chatcmpl-tool-9d0341c7a033ae28","tool_call":{"id":"chatcmpl-tool-9d0341c7a033ae28","responses_item_id":null,"name":"terminal","arguments":"{\"security_risk\": \"LOW\", \"summary\": \"Build session URL from environment\", \"command\": \"SESSION_URL=\\\"${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}\\\"\\necho \\\"SESSION_URL=${SESSION_URL}\\\"\"}","origin":"completion"},"llm_response_id":"0680149d625114e6a97f54b158714212","security_risk":"UNKNOWN","critic_result":null,"summary":"Build session URL from environment","kind":"ActionEvent"} +{"id":"d4f6dce6-2c6f-4559-8481-8b07ce1e775e","timestamp":"2026-06-16T05:39:49.105230","source":"environment","tool_name":"terminal","tool_call_id":"chatcmpl-tool-9d0341c7a033ae28","observation":{"content":[{"cache_prompt":false,"type":"text","text":"Cannot execute multiple commands at once.\nPlease run each command separately OR chain them into a single command via && or ;\nProvided commands:\n(1) SESSION_URL=\"${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}\"\n(2) echo \"SESSION_URL=${SESSION_URL}\""}],"is_error":true,"command":"SESSION_URL=\"${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}\"\necho \"SESSION_URL=${SESSION_URL}\"","exit_code":null,"timeout":false,"metadata":{"exit_code":-1,"pid":-1,"username":null,"hostname":null,"working_dir":null,"py_interpreter_path":null,"prefix":"","suffix":""},"full_output_save_dir":"/workspace/conversations/e18b827d3440425a9ff7353c4a6d6a3a/observations","kind":"TerminalObservation"},"action_id":"e54cbba2-3be2-4494-aa13-83af6044c5a7","kind":"ObservationEvent"} +{"id":"50f603a8-d05a-4cc7-b47f-7295091be6aa","timestamp":"2026-06-16T05:39:55.324065","source":"environment","key":"stats","value":{"usage_to_metrics":{"default":{"model_name":"litellm_proxy/minimax-m2.7","accumulated_cost":0.0,"max_budget_per_task":null,"accumulated_token_usage":{"model":"litellm_proxy/minimax-m2.7","prompt_tokens":71474,"completion_tokens":1155,"cache_read_tokens":34536,"cache_write_tokens":0,"reasoning_tokens":0,"context_window":0,"per_turn_token":35774,"response_id":""}},"condenser":{"model_name":"litellm_proxy/minimax-m2.7","accumulated_cost":0.0,"max_budget_per_task":null,"accumulated_token_usage":{"model":"litellm_proxy/minimax-m2.7","prompt_tokens":0,"completion_tokens":0,"cache_read_tokens":0,"cache_write_tokens":0,"reasoning_tokens":0,"context_window":0,"per_turn_token":0,"response_id":""}}}},"kind":"ConversationStateUpdateEvent"} +{"id":"8df85183-1da3-4677-ad2e-eb87d6f33809","timestamp":"2026-06-16T05:39:55.479027","source":"environment","tool_name":"terminal","tool_call_id":"chatcmpl-tool-9051d3d10f914188","observation":{"content":[{"cache_prompt":false,"type":"text","text":"Cannot execute multiple commands at once.\nPlease run each command separately OR chain them into a single command via && or ;\nProvided commands:\n(1) SESSION_URL=\"${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}\"\n(2) echo \"SESSION_URL=${SESSION_URL}\"\n\n# Extract PR info from the event payload\n(3) PR_NUMBER=$(echo \"$AUTOMATION_EVENT_PAYLOAD\" | python3 -c \"import sys,json; p=json.load(sys.stdin); print(p['pull_request']['number'])\")\n(4) REPO=$(echo \"$AUTOMATION_EVENT_PAYLOAD\" | python3 -c \"import sys,json; p=json.load(sys.stdin); print(p['repository']['full_name'])\")\n(5) echo \"PR_NUMBER=${PR_NUMBER}\"\n(6) echo \"REPO=${REPO}\""}],"is_error":true,"command":"SESSION_URL=\"${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}\"\necho \"SESSION_URL=${SESSION_URL}\"\n\n# Extract PR info from the event payload\nPR_NUMBER=$(echo \"$AUTOMATION_EVENT_PAYLOAD\" | python3 -c \"import sys,json; p=json.load(sys.stdin); print(p['pull_request']['number'])\")\nREPO=$(echo \"$AUTOMATION_EVENT_PAYLOAD\" | python3 -c \"import sys,json; p=json.load(sys.stdin); print(p['repository']['full_name'])\")\necho \"PR_NUMBER=${PR_NUMBER}\"\necho \"REPO=${REPO}\"","exit_code":null,"timeout":false,"metadata":{"exit_code":-1,"pid":-1,"username":null,"hostname":null,"working_dir":null,"py_interpreter_path":null,"prefix":"","suffix":""},"full_output_save_dir":"/workspace/conversations/e18b827d3440425a9ff7353c4a6d6a3a/observations","kind":"TerminalObservation"},"action_id":"70274abb-c9e1-4914-b778-29464d7bf293","kind":"ObservationEvent"} +{"id":"bc1cee85-3816-4b67-80a9-37c002d8b1e8","timestamp":"2026-06-16T05:40:00.385458","source":"environment","key":"stats","value":{"usage_to_metrics":{"default":{"model_name":"litellm_proxy/minimax-m2.7","accumulated_cost":0.0,"max_budget_per_task":null,"accumulated_token_usage":{"model":"litellm_proxy/minimax-m2.7","prompt_tokens":107458,"completion_tokens":1347,"cache_read_tokens":68193,"cache_write_tokens":0,"reasoning_tokens":0,"context_window":0,"per_turn_token":36176,"response_id":""}},"condenser":{"model_name":"litellm_proxy/minimax-m2.7","accumulated_cost":0.0,"max_budget_per_task":null,"accumulated_token_usage":{"model":"litellm_proxy/minimax-m2.7","prompt_tokens":0,"completion_tokens":0,"cache_read_tokens":0,"cache_write_tokens":0,"reasoning_tokens":0,"context_window":0,"per_turn_token":0,"response_id":""}}}},"kind":"ConversationStateUpdateEvent"} +{"id":"1eb65bb3-22e7-473f-855c-2dcf26ae8e08","timestamp":"2026-06-16T05:40:00.385957","source":"agent","thought":[{"cache_prompt":false,"type":"text","text":"Let me chain these properly with && and ;\n\n\n"}],"reasoning_content":"Let me chain these properly with && and ;\n","thinking_blocks":[],"responses_reasoning_item":null,"action":{"command":"SESSION_URL=\"${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}\" && echo \"SESSION_URL=${SESSION_URL}\" && PR_NUMBER=$(echo \"$AUTOMATION_EVENT_PAYLOAD\" | python3 -c \"import sys,json; p=json.load(sys.stdin); print(p['pull_request']['number'])\") && REPO=$(echo \"$AUTOMATION_EVENT_PAYLOAD\" | python3 -c \"import sys,json; p=json.load(sys.stdin); print(p['repository']['full_name'])\") && echo \"PR_NUMBER=${PR_NUMBER}\" && echo \"REPO=${REPO}\"","is_input":false,"timeout":null,"reset":false,"kind":"TerminalAction"},"tool_name":"terminal","tool_call_id":"chatcmpl-tool-9fa678118a57807d","tool_call":{"id":"chatcmpl-tool-9fa678118a57807d","responses_item_id":null,"name":"terminal","arguments":"{\"security_risk\": \"LOW\", \"summary\": \"Build session URL and extract PR info\", \"command\": \"SESSION_URL=\\\"${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}\\\" && echo \\\"SESSION_URL=${SESSION_URL}\\\" && PR_NUMBER=$(echo \\\"$AUTOMATION_EVENT_PAYLOAD\\\" | python3 -c \\\"import sys,json; p=json.load(sys.stdin); print(p['pull_request']['number'])\\\") && REPO=$(echo \\\"$AUTOMATION_EVENT_PAYLOAD\\\" | python3 -c \\\"import sys,json; p=json.load(sys.stdin); print(p['repository']['full_name'])\\\") && echo \\\"PR_NUMBER=${PR_NUMBER}\\\" && echo \\\"REPO=${REPO}\\\"\"}","origin":"completion"},"llm_response_id":"068014ab9c72170b40bca7fa74f68a89","security_risk":"UNKNOWN","critic_result":null,"summary":"Build session URL and extract PR info","kind":"ActionEvent"} +{"id":"88ece7fc-2f7b-4d51-b006-5570b76f4e19","timestamp":"2026-06-16T05:40:01.175476","source":"environment","tool_name":"terminal","tool_call_id":"chatcmpl-tool-9fa678118a57807d","observation":{"content":[{"cache_prompt":false,"type":"text","text":"SESSION_URL=\nTraceback (most recent call last):\n File \"\", line 1, in \n import sys,json; p=json.load(sys.stdin); print(p['pull_request']['number'])\n ~~~~~~~~~^^^^^^^^^^^\n File \"/usr/local/lib/python3.13/json/__init__.py\", line 298, in load\n return loads(fp.read(),\n cls=cls, object_hook=object_hook,\n parse_float=parse_float, parse_int=parse_int,\n parse_constant=parse_constant, object_pairs_hook=object_pairs_hook, **kw)\n File \"/usr/local/lib/python3.13/json/__init__.py\", line 352, in loads\n return _default_decoder.decode(s)\n ~~~~~~~~~~~~~~~~~~~~~~~^^^\n File \"/usr/local/lib/python3.13/json/decoder.py\", line 345, in decode\n obj, end = self.raw_decode(s, idx=_w(s, 0).end())\n ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^\n File \"/usr/local/lib/python3.13/json/decoder.py\", line 363, in raw_decode\n raise JSONDecodeError(\"Expecting value\", s, err.value) from None\njson.decoder.JSONDecodeError: Expecting value: line 2 column 1 (char 1)"}],"is_error":false,"command":"SESSION_URL=\"${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}\" && echo \"SESSION_URL=${SESSION_URL}\" && PR_NUMBER=$(echo \"$AUTOMATION_EVENT_PAYLOAD\" | python3 -c \"import sys,json; p=json.load(sys.stdin); print(p['pull_request']['number'])\") && REPO=$(echo \"$AUTOMATION_EVENT_PAYLOAD\" | python3 -c \"import sys,json; p=json.load(sys.stdin); print(p['repository']['full_name'])\") && echo \"PR_NUMBER=${PR_NUMBER}\" && echo \"REPO=${REPO}\"","exit_code":1,"timeout":false,"metadata":{"exit_code":1,"pid":-1,"username":"openhands","hostname":"runtime-sfzcsbejkqbokdub-7c4d4748c7-pb2st","working_dir":"/workspace/project","py_interpreter_path":"/usr/local/bin/python","prefix":"","suffix":"\n[The command completed with exit code 1.]"},"full_output_save_dir":"/workspace/conversations/e18b827d3440425a9ff7353c4a6d6a3a/observations","kind":"TerminalObservation"},"action_id":"1eb65bb3-22e7-473f-855c-2dcf26ae8e08","kind":"ObservationEvent"} diff --git a/tests/sdk/conversation/goal/fixtures/expected_transcript.txt b/tests/sdk/conversation/goal/fixtures/expected_transcript.txt new file mode 100644 index 0000000000..a8249ce828 --- /dev/null +++ b/tests/sdk/conversation/goal/fixtures/expected_transcript.txt @@ -0,0 +1,62 @@ +assistant: Let me analyze the task. I need to: + +1. Build the session URL +2. Extract PR info from the event payload +3. Post a progress comment and save the comment ID +4. Review the PR using the code-review and github-pr-review skills +5. Update the progress comment when done + +Let me start by executing Steps 1 and 2. + + + +I'll help you review PR #3745 from the OpenHands/software-agent-sdk repository. Let me start by following the steps in order. + +tool: [An error occurred during execution.] + +Cannot execute multiple commands at once. +Please run each command separately OR chain them into a single command via && or ; +Provided commands: +(1) SESSION_URL="${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}" +(2) echo "SESSION_URL=${SESSION_URL}" + +tool: [An error occurred during execution.] + +Cannot execute multiple commands at once. +Please run each command separately OR chain them into a single command via && or ; +Provided commands: +(1) SESSION_URL="${AUTOMATION_SESSION_URL:-${AUTOMATION_API_URL:-https://app.all-hands.dev}}" +(2) echo "SESSION_URL=${SESSION_URL}" + +# Extract PR info from the event payload +(3) PR_NUMBER=$(echo "$AUTOMATION_EVENT_PAYLOAD" | python3 -c "import sys,json; p=json.load(sys.stdin); print(p['pull_request']['number'])") +(4) REPO=$(echo "$AUTOMATION_EVENT_PAYLOAD" | python3 -c "import sys,json; p=json.load(sys.stdin); print(p['repository']['full_name'])") +(5) echo "PR_NUMBER=${PR_NUMBER}" +(6) echo "REPO=${REPO}" + +assistant: Let me chain these properly with && and ; + + +tool: SESSION_URL= +Traceback (most recent call last): + File "", line 1, in + import sys,json; p=json.load(sys.stdin); print(p['pull_request']['number']) + ~~~~~~~~~^^^^^^^^^^^ + File "/usr/local/lib/python3.13/json/__init__.py", line 298, in load + return loads(fp.read(), + cls=cls, object_hook=object_hook, + parse_float=parse_float, parse_int=parse_int, + parse_constant=parse_constant, object_pairs_hook=object_pairs_hook, **kw) + File "/usr/local/lib/python3.13/json/__init__.py", line 352, in loads + return _default_decoder.decode(s) + ~~~~~~~~~~~~~~~~~~~~~~~^^^ + File "/usr/local/lib/python3.13/json/decoder.py", line 345, in decode + obj, end = self.raw_decode(s, idx=_w(s, 0).end()) + ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^ + File "/usr/local/lib/python3.13/json/decoder.py", line 363, in raw_decode + raise JSONDecodeError("Expecting value", s, err.value) from None +json.decoder.JSONDecodeError: Expecting value: line 2 column 1 (char 1) +[The command completed with exit code 1.] +[Current working directory: /workspace/project] +[Python interpreter: /usr/local/bin/python] +[Command finished with exit code 1] \ No newline at end of file diff --git a/tests/sdk/conversation/goal/test_controller.py b/tests/sdk/conversation/goal/test_controller.py new file mode 100644 index 0000000000..49b60cd977 --- /dev/null +++ b/tests/sdk/conversation/goal/test_controller.py @@ -0,0 +1,71 @@ +"""Tests for GoalController (the transport-agnostic /goal decision logic).""" + +import pytest + +from openhands.sdk.conversation.goal import ( + GoalContinue, + GoalController, + GoalDone, +) +from openhands.sdk.llm import Message, TextContent +from openhands.sdk.testing import TestLLM + + +def _judge(*texts: str) -> TestLLM: + """A judge LLM scripted to return one verdict per on_run_finished() call.""" + return TestLLM.from_messages( + [Message(role="assistant", content=[TextContent(text=t)]) for t in texts] + ) + + +def test_start_returns_objective(): + controller = GoalController("build x", _judge("{}")) + assert controller.start() == "build x" + assert controller.iteration == 0 + + +def test_continue_when_incomplete(): + controller = GoalController( + "build x", + _judge('{"score": 0.2, "complete": false, "missing": "tests"}'), + max_iterations=3, + ) + step = controller.on_run_finished([]) + assert isinstance(step, GoalContinue) + assert "tests" in step.followup + assert controller.iteration == 1 + + +def test_done_when_complete(): + controller = GoalController( + "build x", _judge('{"score": 1.0, "complete": true, "missing": ""}') + ) + step = controller.on_run_finished([]) + assert isinstance(step, GoalDone) + assert step.outcome.status == "complete" + assert step.outcome.iterations == 1 + assert step.outcome.verdict.complete + + +def test_caps_at_max_iterations(): + incomplete = '{"score": 0.1, "complete": false, "missing": "still broken"}' + controller = GoalController( + "build x", _judge(incomplete, incomplete), max_iterations=2 + ) + + assert isinstance(controller.on_run_finished([]), GoalContinue) # round 1 + step = controller.on_run_finished([]) # round 2 -> capped + assert isinstance(step, GoalDone) + assert step.outcome.status == "capped" + assert step.outcome.iterations == 2 + assert not step.outcome.verdict.complete + + +@pytest.mark.parametrize( + ("objective", "max_iterations"), + [(" ", 10), ("build x", 0)], + ids=["empty-objective", "bad-max-iterations"], +) +def test_validates_inputs(objective, max_iterations): + with pytest.raises(ValueError): + GoalController(objective, _judge("{}"), max_iterations=max_iterations) diff --git a/tests/sdk/conversation/goal/test_judge.py b/tests/sdk/conversation/goal/test_judge.py new file mode 100644 index 0000000000..0be4c2702f --- /dev/null +++ b/tests/sdk/conversation/goal/test_judge.py @@ -0,0 +1,95 @@ +"""Tests for judge_goal / GoalVerdict (the goal-completion judge kernel).""" + +import pytest + +from openhands.sdk.conversation.goal import GoalVerdict, judge_goal +from openhands.sdk.llm import Message, TextContent +from openhands.sdk.testing import TestLLM + + +def _judge(text: str) -> TestLLM: + """A judge LLM scripted to return a single verdict string.""" + return TestLLM.from_messages( + [Message(role="assistant", content=[TextContent(text=text)])] + ) + + +class _StreamGuardLLM(TestLLM): + """TestLLM that enforces the real LLM's streaming guard. + + Lets us prove ``judge_goal`` disables streaming before calling + ``completion()`` (plain ``TestLLM`` ignores ``stream`` entirely). + """ + + def completion( + self, + messages, + tools=None, + _return_metrics=False, + add_security_risk_prediction=False, + on_token=None, + **kwargs, + ): + if self.stream and on_token is None: + raise ValueError("Streaming requires an on_token callback") + return super().completion( + messages, + tools, + _return_metrics, + add_security_risk_prediction, + on_token, + **kwargs, + ) + + +@pytest.mark.parametrize( + ("response", "score", "complete", "missing"), + [ + # exact-JSON verdict + ('{"score": 0.9, "complete": true, "missing": ""}', 0.9, True, ""), + # incomplete verdict surfaces the missing work + ( + '{"score": 0.1, "complete": false, "missing": "tests not run"}', + 0.1, + False, + "tests not run", + ), + # JSON embedded in prose / a code fence is still extracted + ( + "Here is my verdict:\n```json\n" + '{"score": 0.4, "complete": false, "missing": "lint"}\n```', + 0.4, + False, + "lint", + ), + # unparseable -> conservative (keeps the caller working) + ("I cannot decide.", 0.0, False, "Judge verdict could not be parsed."), + # out-of-range score is clamped into [0, 1] + ('{"score": 1.5, "complete": true, "missing": ""}', 1.0, True, ""), + ], + ids=["complete", "incomplete", "json-in-fence", "unparseable", "clamped-score"], +) +def test_judge_goal_parses_verdict(response, score, complete, missing): + verdict = judge_goal(_judge(response), "build it", []) + assert isinstance(verdict, GoalVerdict) + assert verdict.score == score + assert verdict.complete is complete + assert verdict.missing == missing + + +def test_judge_goal_disables_streaming_on_judge_llm(): + """A stream=True judge LLM must not trip completion()'s on_token guard.""" + llm = _StreamGuardLLM.from_messages( + [ + Message( + role="assistant", + content=[ + TextContent(text='{"score": 1.0, "complete": true, "missing": ""}') + ], + ) + ], + usage_id="judge", + stream=True, + ) + verdict = judge_goal(llm, "build x", []) # would raise without the fix + assert verdict.complete diff --git a/tests/sdk/conversation/goal/test_render_transcript.py b/tests/sdk/conversation/goal/test_render_transcript.py new file mode 100644 index 0000000000..784a583d23 --- /dev/null +++ b/tests/sdk/conversation/goal/test_render_transcript.py @@ -0,0 +1,77 @@ +"""One complete test of the exact transcript the judge sees, over a REAL trace. + +``fixtures/events.jsonl`` is a trimmed slice of the persisted events of an actual +conversation (a PR-review automation run: ``terminal`` tool calls + observations ++ state updates), loaded with the SDK's real deserializer +(``Event.model_validate_json``). ``fixtures/expected_transcript.txt`` is the +byte-for-byte golden of exactly what ``judge_goal`` feeds the judge -- so this +also guards the rendering and the persisted event format. +""" + +from collections import Counter +from pathlib import Path + +import pytest + +from openhands.sdk.conversation.goal import judge_goal +from openhands.sdk.conversation.goal.judge import _render_transcript +from openhands.sdk.event import Event, LLMConvertibleEvent, SystemPromptEvent +from openhands.sdk.llm import Message, TextContent +from openhands.sdk.testing import TestLLM + + +# The trace's `terminal` tool actions need openhands-tools to deserialize their +# kinds; skip in isolated openhands-sdk runs where it is not installed. +pytest.importorskip("openhands.tools.terminal") + + +_FIXTURES = Path(__file__).parent / "fixtures" + + +def _load_trace() -> list[Event]: + lines = (_FIXTURES / "events.jsonl").read_text().splitlines() + return [Event.model_validate_json(line) for line in lines if line.strip()] + + +def test_render_transcript_for_judge(): + events = _load_trace() + + # A real trace: agent tool calls + observations + (non-rendered) state updates. + kinds = Counter(type(e).__name__ for e in events) + assert kinds["ActionEvent"] and kinds["ObservationEvent"] + assert kinds["ConversationStateUpdateEvent"] # present, but NOT LLM-convertible + + convertible = [e for e in events if isinstance(e, LLMConvertibleEvent)] + transcript = _render_transcript(convertible) + + # Tool calls render as: ActionEvent -> assistant reasoning turn, + # ObservationEvent -> `tool:` turn. + assert "assistant: " in transcript + assert "\n\ntool: " in transcript + # Secrets are redacted in what the judge sees. + assert "" in transcript + + # The (large) system prompt is excluded by design: prepending a + # SystemPromptEvent does not change the rendered transcript. + system_event = SystemPromptEvent( + system_prompt=TextContent(text="...big system prompt..."), + tools=[], + ) + assert _render_transcript([system_event, *convertible]) == transcript + + # The full kernel turns this transcript into a verdict (the judge's feedback). + judge = TestLLM.from_messages( + [ + Message( + role="assistant", + content=[TextContent(text='{"score": 0.8, "complete": true}')], + ) + ] + ) + verdict = judge_goal(judge, "Review PR #3745", events) + assert verdict.complete is True + assert verdict.score == 0.8 + + # Byte-for-byte: EXACTLY what judge_goal feeds the judge over the real trace. + expected = (_FIXTURES / "expected_transcript.txt").read_text() + assert transcript == expected diff --git a/tests/sdk/conversation/goal/test_runner.py b/tests/sdk/conversation/goal/test_runner.py new file mode 100644 index 0000000000..f33eccbf60 --- /dev/null +++ b/tests/sdk/conversation/goal/test_runner.py @@ -0,0 +1,65 @@ +"""Tests for the /goal driver loop and command parsing.""" + +import pytest + +from openhands.sdk.agent import Agent +from openhands.sdk.conversation import Conversation +from openhands.sdk.conversation.base import BaseConversation +from openhands.sdk.conversation.goal import GoalOutcome, run_goal +from openhands.sdk.llm import Message, TextContent +from openhands.sdk.testing import TestLLM + + +def _text_llm(*texts: str) -> TestLLM: + """An LLM scripted to return content-only replies (each finishes a turn).""" + return TestLLM.from_messages( + [Message(role="assistant", content=[TextContent(text=t)]) for t in texts] + ) + + +def _conversation(*agent_turns: str) -> BaseConversation: + """A conversation whose agent finishes (content-only) on each run().""" + agent = Agent(llm=_text_llm(*agent_turns), tools=[]) + return Conversation(agent=agent) + + +_DONE = '{"score": 1.0, "complete": true, "missing": ""}' +_NOT_DONE = '{"score": 0.2, "complete": false, "missing": "tests"}' + + +@pytest.mark.parametrize( + ("agent_turns", "verdicts", "max_iterations", "status", "iterations"), + [ + # judge says "done" on the first audit + (("done",), (_DONE,), 5, "complete", 1), + # "not done" then "done" -> loops once more, then completes + (("turn 1", "turn 2"), (_NOT_DONE, _DONE), 5, "complete", 2), + # never done -> capped at max_iterations + (("turn 1", "turn 2"), (_NOT_DONE, _NOT_DONE), 2, "capped", 2), + ], + ids=["complete-first-audit", "loops-until-complete", "caps-at-max"], +) +def test_run_goal_outcomes(agent_turns, verdicts, max_iterations, status, iterations): + conversation = _conversation(*agent_turns) + outcome = run_goal( + conversation, "build x", _text_llm(*verdicts), max_iterations=max_iterations + ) + assert isinstance(outcome, GoalOutcome) + assert outcome.status == status + assert outcome.iterations == iterations + assert outcome.verdict.complete is (status == "complete") + + +@pytest.mark.parametrize( + ("objective", "max_iterations"), + [(" ", 5), ("build x", 0)], + ids=["empty-objective", "bad-max-iterations"], +) +def test_run_goal_rejects_invalid_input(objective, max_iterations): + with pytest.raises(ValueError): + run_goal( + _conversation("noop"), + objective, + _text_llm("{}"), + max_iterations=max_iterations, + ) From fb705c353302bc3b1813ffadca05680b7cf997c7 Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Wed, 17 Jun 2026 10:24:37 +0200 Subject: [PATCH 2/9] feat: add /goal agent-server endpoint, background loop, and stop/resume Wire the SDK /goal loop into the agent server so a UI can run, watch, stop, and resume goals on the live conversation (no fork). - EventService.start_goal/_run_goal: a background driver that drives the shared conversation and publishes ConversationStateUpdateEvent(key="goal") lifecycle updates (running/complete/capped/interrupted) for a UI chip. - stop_goal/resume_goal/_last_goal_status: cancel a running goal (a normal user message also interrupts it) and resume from the persisted iteration, including across a server restart. - POST /conversations/{id}/goal, /goal/stop, /goal/resume. Stacked on the SDK /goal core. Relates to #3569. --- .../agent_server/conversation_router.py | 85 +++++ .../openhands/agent_server/event_service.py | 236 +++++++++++- .../openhands/agent_server/models.py | 9 + .../agent_server/test_conversation_router.py | 154 ++++++++ tests/agent_server/test_goal_loop.py | 357 ++++++++++++++++++ 5 files changed, 839 insertions(+), 2 deletions(-) create mode 100644 tests/agent_server/test_goal_loop.py diff --git a/openhands-agent-server/openhands/agent_server/conversation_router.py b/openhands-agent-server/openhands/agent_server/conversation_router.py index afafa4beb3..8ac7ae22c1 100644 --- a/openhands-agent-server/openhands/agent_server/conversation_router.py +++ b/openhands-agent-server/openhands/agent_server/conversation_router.py @@ -34,6 +34,7 @@ SetConfirmationPolicyRequest, SetSecurityAnalyzerRequest, StartConversationRequest, + StartGoalRequest, Success, UpdateConversationRequest, UpdateSecretsRequest, @@ -286,6 +287,90 @@ async def run_conversation( return Success() +@conversation_router.post( + "/{conversation_id}/goal", + responses={ + 404: {"description": "Item not found"}, + 409: {"description": "Conversation or goal is already running"}, + }, +) +async def start_goal_conversation( + conversation_id: UUID, + request: StartGoalRequest, + conversation_service: ConversationService = Depends(get_conversation_service), +) -> Success: + """Start a ``/goal`` driver loop in the background on the conversation. + + Drives the agent toward ``objective``, judging completion after each run and + re-prompting until done or ``max_iterations``. All work lands in the same + conversation history and event stream as the main chat (it is not a fork). + """ + event_service = await conversation_service.get_event_service(conversation_id) + if event_service is None: + raise HTTPException(status.HTTP_404_NOT_FOUND) + + try: + await event_service.start_goal( + request.objective, max_iterations=request.max_iterations + ) + except ValueError as e: + message = str(e) + if message in ("conversation_already_running", "goal_already_running"): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Conversation or goal already running.", + ) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + + return Success() + + +@conversation_router.post( + "/{conversation_id}/goal/stop", + responses={404: {"description": "Item not found"}}, +) +async def stop_goal_conversation( + conversation_id: UUID, + conversation_service: ConversationService = Depends(get_conversation_service), +) -> Success: + """Stop a running ``/goal`` loop. The goal records a resumable state.""" + event_service = await conversation_service.get_event_service(conversation_id) + if event_service is None: + raise HTTPException(status.HTTP_404_NOT_FOUND) + await event_service.stop_goal() + return Success() + + +@conversation_router.post( + "/{conversation_id}/goal/resume", + responses={ + 404: {"description": "Item not found"}, + 409: {"description": "Conversation or goal is already running"}, + }, +) +async def resume_goal_conversation( + conversation_id: UUID, + conversation_service: ConversationService = Depends(get_conversation_service), +) -> Success: + """Resume a previously interrupted ``/goal`` loop from where it left off.""" + event_service = await conversation_service.get_event_service(conversation_id) + if event_service is None: + raise HTTPException(status.HTTP_404_NOT_FOUND) + + try: + await event_service.resume_goal() + except ValueError as e: + message = str(e) + if message in ("conversation_already_running", "goal_already_running"): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Conversation or goal already running.", + ) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + + return Success() + + @conversation_router.post( "/{conversation_id}/secrets", responses={404: {"description": "Item not found"}} ) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 86d119e10a..9c2ed09821 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -17,9 +17,19 @@ StoredConversation, ) from openhands.agent_server.pub_sub import PubSub, Subscriber -from openhands.sdk import LLM, AgentBase, Event, Message, get_logger +from openhands.sdk import LLM, AgentBase, Event, Message, TextContent, get_logger from openhands.sdk.agent import ACPAgent from openhands.sdk.conversation.base import BaseConversation +from openhands.sdk.conversation.goal import ( + GoalController, + GoalDone, + GoalOutcome, + GoalStatus, + GoalStatusName, + GoalStep, + GoalVerdict, +) +from openhands.sdk.conversation.goal.prompts import RESUME_PROMPT from openhands.sdk.conversation.impl.local_conversation import ( ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID, ACP_SUPERSEDE_INFLIGHT_PROMPT, @@ -92,6 +102,9 @@ class EventService: _lease_task: asyncio.Task | None = field(default=None, init=False) _external_lease_renewal: bool = field(default=False, init=False) _run_executor: ThreadPoolExecutor | None = field(default=None, init=False) + # Background driver task for an in-progress /goal loop, and its last outcome. + _goal_task: asyncio.Task | None = field(default=None, init=False) + _goal_outcome: GoalOutcome | None = field(default=None, init=False) @property def conversation_dir(self): @@ -447,9 +460,16 @@ async def batch_get_events(self, event_ids: list[str]) -> list[Event | None]: ) return results - async def send_message(self, message: Message, run: bool = False): + async def send_message( + self, message: Message, run: bool = False, _from_goal: bool = False + ): if not self._conversation: raise ValueError("inactive_service") + # A user message takes back control: stop any running /goal loop first + # (it persists an "interrupted" status and can be resumed later). The + # goal loop's own messages pass _from_goal=True to skip this. + if not _from_goal: + await self.stop_goal() explicit_interrupt_generation = self._explicit_interrupt_generation loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._conversation.send_message, message) @@ -989,6 +1009,209 @@ async def _run_and_publish(): # Create task but don't await it - runs in background self._run_task = asyncio.create_task(_run_and_publish()) + async def start_goal( + self, + objective: str, + *, + judge_llm: LLM | None = None, + max_iterations: int = 10, + ) -> None: + """Start a ``/goal`` driver loop in the background on this conversation. + + Sends the objective, runs the agent, and judges completion after each + run, re-prompting until the goal is done or ``max_iterations`` is + reached. All work lands in this conversation's shared event history and + streams to subscribers, exactly like a normal run -- it is *not* a fork. + + Args: + objective: The goal to pursue and audit against. + judge_llm: LLM that grades completion. Defaults to the agent's LLM. + max_iterations: Hard cap on audit rounds before giving up. + + Raises: + ValueError: If the service is inactive, a goal is already running, + no judge LLM is available, or the objective is empty. + """ + if not self._conversation or self._closing: + raise ValueError("inactive_service") + if self._goal_task is not None and not self._goal_task.done(): + raise ValueError("goal_already_running") + if judge_llm is None: + judge_llm = getattr(self._conversation.agent, "llm", None) + if judge_llm is None: + raise ValueError("no_judge_llm") + # GoalController validates the objective/max_iterations (raises ValueError). + controller = GoalController(objective, judge_llm, max_iterations=max_iterations) + self._goal_outcome = None + self._goal_task = asyncio.create_task(self._run_goal(controller)) + + async def _run_goal( + self, controller: GoalController, *, resume: bool = False + ) -> None: + """Background driver loop for :meth:`start_goal` / :meth:`resume_goal`. + + Reuses the SDK's transport-agnostic ``GoalController`` for all decisions; + this method only owns I/O: sending messages, awaiting each background run, + judging off the event loop, and publishing goal-status updates. + """ + conversation = self._conversation + if conversation is None: + return + loop = asyncio.get_running_loop() + + def _snapshot_and_judge() -> GoalStep: + # Snapshot events under the conversation lock, then judge (an LLM + # call) with the lock released -- both on this worker thread. + with conversation._state: + events = list(conversation._state.events) + return controller.on_run_finished(events) + + def _user(text: str) -> Message: + return Message(role="user", content=[TextContent(text=text)]) + + async def _emit_status( + *, + active: bool, + status: GoalStatusName, + verdict: GoalVerdict | None = None, + ) -> None: + # Persist + publish a goal-status update so a UI can render a chip. + # ConversationStateUpdateEvent is not LLM-convertible, so it never + # enters the agent's or the judge's context. + event = ConversationStateUpdateEvent( + key="goal", + value=GoalStatus( + active=active, + status=status, + iteration=controller.iteration, + max_iterations=controller.max_iterations, + objective=controller.objective, + verdict=verdict, + ).model_dump(), + ) + + def _persist() -> None: + with conversation._state: + conversation._on_event(event) + + await loop.run_in_executor(None, _persist) + + try: + await _emit_status(active=True, status="running") + nudge = RESUME_PROMPT if resume else controller.start() + await self.send_message(_user(nudge), run=False, _from_goal=True) + while True: + try: + await self.run() + except ValueError as e: + if str(e) != "conversation_already_running": + raise + run_task = self._run_task + if run_task is not None: + await asyncio.wait({run_task}) + status = await self._get_execution_status() + if status in ( + ConversationExecutionStatus.PAUSED, + ConversationExecutionStatus.ERROR, + ConversationExecutionStatus.STUCK, + ): + logger.info("Goal loop halted early: status=%s", status) + await _emit_status(active=False, status="interrupted") + return + step = await loop.run_in_executor(None, _snapshot_and_judge) + if isinstance(step, GoalDone): + self._goal_outcome = step.outcome + await _emit_status( + active=False, + status=step.outcome.status, + verdict=step.outcome.verdict, + ) + logger.info( + "Goal %s after %d round(s)", + step.outcome.status, + step.outcome.iterations, + ) + return + await _emit_status(active=True, status="running") + await self.send_message( + _user(step.followup), run=False, _from_goal=True + ) + except asyncio.CancelledError: + logger.info("Goal loop cancelled") + # An explicit stop / user interjection: record an interrupted status + # (resumable). Skip during close(), which is tearing the service down. + if not self._closing: + with suppress(Exception): + await _emit_status(active=False, status="interrupted") + raise + except Exception: + logger.exception("Goal loop failed") + finally: + self._goal_task = None + + async def stop_goal(self) -> bool: + """Stop a running ``/goal`` loop. Returns True if one was stopped. + + The loop records an ``interrupted`` status before exiting, so it can be + resumed later via :meth:`resume_goal`. + """ + task = self._goal_task + if task is None or task.done(): + return False + task.cancel() + with suppress(asyncio.CancelledError): + await task + return True + + def _last_goal_status(self) -> dict | None: + """Return the most recent goal-status payload, or None if there is none.""" + conversation = self._conversation + if conversation is None: + return None + with conversation._state: + for event in reversed(list(conversation._state.events)): + if ( + isinstance(event, ConversationStateUpdateEvent) + and event.key == "goal" + ): + return event.value if isinstance(event.value, dict) else None + return None + + async def resume_goal( + self, *, judge_llm: LLM | None = None, max_iterations: int | None = None + ) -> None: + """Resume a previously interrupted (or crashed) ``/goal`` loop. + + Reconstructs the goal from the last persisted goal-status event and + continues from the iteration it had reached. Works within a session and + across a server restart (goal-status events are persisted). + + Raises: + ValueError: If the service is inactive, a goal is already running, + no judge LLM is available, or there is no resumable goal (none + was ever started, or the last one already completed/capped). + """ + if not self._conversation or self._closing: + raise ValueError("inactive_service") + if self._goal_task is not None and not self._goal_task.done(): + raise ValueError("goal_already_running") + loop = asyncio.get_running_loop() + last = await loop.run_in_executor(None, self._last_goal_status) + if last is None or last.get("status") in ("complete", "capped"): + raise ValueError("no_resumable_goal") + if judge_llm is None: + judge_llm = getattr(self._conversation.agent, "llm", None) + if judge_llm is None: + raise ValueError("no_judge_llm") + controller = GoalController( + last["objective"], + judge_llm, + max_iterations=max_iterations or int(last["max_iterations"]), + ) + controller.iteration = int(last["iteration"]) + self._goal_outcome = None + self._goal_task = asyncio.create_task(self._run_goal(controller, resume=True)) + async def respond_to_confirmation(self, request: ConfirmationResponseRequest): if request.accept: try: @@ -1106,6 +1329,15 @@ async def close(self): self._explicit_interrupt_generation += 1 self._rerun_requested = False self._acp_internal_rerun_requested = False + + # Cancel any in-progress /goal loop first so it cannot start a new run + # while we drain the current one below. + if self._goal_task is not None and not self._goal_task.done(): + self._goal_task.cancel() + with suppress(asyncio.CancelledError): + await self._goal_task + self._goal_task = None + if self._lease_task is not None: self._lease_task.cancel() with suppress(asyncio.CancelledError): diff --git a/openhands-agent-server/openhands/agent_server/models.py b/openhands-agent-server/openhands/agent_server/models.py index 35da059e8f..badc229f8e 100644 --- a/openhands-agent-server/openhands/agent_server/models.py +++ b/openhands-agent-server/openhands/agent_server/models.py @@ -473,6 +473,15 @@ class AskAgentResponse(BaseModel): response: str = Field(description="The agent's response to the question") +class StartGoalRequest(BaseModel): + """Payload to start a ``/goal`` driver loop on a conversation.""" + + objective: str = Field(description="The goal objective to pursue and audit.") + max_iterations: int = Field( + default=10, ge=1, description="Maximum audit rounds before giving up." + ) + + class AgentResponseResult(BaseModel): """The agent's final response for a conversation. diff --git a/tests/agent_server/test_conversation_router.py b/tests/agent_server/test_conversation_router.py index fd86b04ef6..e818e1bb71 100644 --- a/tests/agent_server/test_conversation_router.py +++ b/tests/agent_server/test_conversation_router.py @@ -1093,6 +1093,160 @@ def test_run_conversation_not_found( client.app.dependency_overrides.clear() +def test_start_goal_success( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """start_goal endpoint forwards the objective to the event service.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.start_goal.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal", + json={"objective": "build x"}, + ) + assert response.status_code == 200 + assert response.json()["success"] is True + mock_event_service.start_goal.assert_awaited_once_with( + "build x", max_iterations=10 + ) + finally: + client.app.dependency_overrides.clear() + + +def test_start_goal_not_found( + client, mock_conversation_service, sample_conversation_id +): + """start_goal returns 404 when the conversation is unknown.""" + mock_conversation_service.get_event_service.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal", + json={"objective": "build x"}, + ) + assert response.status_code == 404 + finally: + client.app.dependency_overrides.clear() + + +def test_start_goal_already_running( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """start_goal returns 409 when a goal/run is already in progress.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.start_goal.side_effect = ValueError("goal_already_running") + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal", + json={"objective": "build x"}, + ) + assert response.status_code == 409 + finally: + client.app.dependency_overrides.clear() + + +def test_stop_goal_success( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """stop_goal endpoint forwards to the event service.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.stop_goal.return_value = True + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post(f"/api/conversations/{sample_conversation_id}/goal/stop") + assert response.status_code == 200 + assert response.json()["success"] is True + mock_event_service.stop_goal.assert_awaited_once() + finally: + client.app.dependency_overrides.clear() + + +def test_stop_goal_not_found(client, mock_conversation_service, sample_conversation_id): + """stop_goal returns 404 when the conversation is unknown.""" + mock_conversation_service.get_event_service.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post(f"/api/conversations/{sample_conversation_id}/goal/stop") + assert response.status_code == 404 + finally: + client.app.dependency_overrides.clear() + + +def test_resume_goal_success( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """resume_goal endpoint forwards to the event service.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.resume_goal.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal/resume" + ) + assert response.status_code == 200 + assert response.json()["success"] is True + mock_event_service.resume_goal.assert_awaited_once() + finally: + client.app.dependency_overrides.clear() + + +def test_resume_goal_not_found( + client, mock_conversation_service, sample_conversation_id +): + """resume_goal returns 404 when the conversation is unknown.""" + mock_conversation_service.get_event_service.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal/resume" + ) + assert response.status_code == 404 + finally: + client.app.dependency_overrides.clear() + + +def test_resume_goal_no_resumable( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """resume_goal returns 400 when there is nothing to resume.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.resume_goal.side_effect = ValueError("no_resumable_goal") + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal/resume" + ) + assert response.status_code == 400 + finally: + client.app.dependency_overrides.clear() + + def test_switch_acp_model_success( client, mock_conversation_service, mock_event_service, sample_conversation_id ): diff --git a/tests/agent_server/test_goal_loop.py b/tests/agent_server/test_goal_loop.py new file mode 100644 index 0000000000..85ec07f961 --- /dev/null +++ b/tests/agent_server/test_goal_loop.py @@ -0,0 +1,357 @@ +"""Tests for the /goal driver loop in EventService (the agent-server side). + +These drive a real EventService + LocalConversation with a scripted TestLLM +agent (each run finishes on a content-only reply) and a separate scripted judge. +""" + +import asyncio +from threading import Event +from typing import cast +from unittest.mock import MagicMock, patch +from uuid import uuid4 + +import pytest +from pydantic import PrivateAttr, SecretStr + +from openhands.agent_server.event_service import EventService +from openhands.agent_server.models import StoredConversation +from openhands.sdk.agent import Agent +from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent +from openhands.sdk.llm import LLM, Message, TextContent +from openhands.sdk.testing import TestLLM +from openhands.sdk.workspace import LocalWorkspace + + +def _scripted(*texts: str, usage_id: str) -> TestLLM: + return TestLLM.from_messages( + [Message(role="assistant", content=[TextContent(text=t)]) for t in texts], + usage_id=usage_id, + ) + + +class _GatedLLM(TestLLM): + """Judge LLM that blocks until released, signalling when it has entered. + + Lets a test deterministically catch the goal loop mid-audit (no run in + flight) and stop it. + """ + + _gate: Event = PrivateAttr(default_factory=Event) + _entered: Event = PrivateAttr(default_factory=Event) + + def completion( + self, + messages, + tools=None, + _return_metrics=False, + add_security_risk_prediction=False, + on_token=None, + **kwargs, + ): + self._entered.set() + self._gate.wait(timeout=10) + return super().completion( + messages, + tools, + _return_metrics, + add_security_risk_prediction, + on_token, + **kwargs, + ) + + +def _goal_status_updates(event_service: EventService) -> list: + return [ + e.value + for e in event_service.get_conversation()._state.events + if isinstance(e, ConversationStateUpdateEvent) and e.key == "goal" + ] + + +@pytest.fixture +def event_service(tmp_path): + with patch("openhands.sdk.llm.utils.model_info.httpx.get") as mock_get: + mock_get.return_value = MagicMock(json=lambda: {"data": []}) + service = EventService( + stored=StoredConversation( + id=uuid4(), + agent=Agent( + llm=LLM( + usage_id="agent", model="test-model", api_key=SecretStr("x") + ), + tools=[], + ), + workspace=LocalWorkspace(working_dir=str(tmp_path / "workspace")), + ), + conversations_dir=tmp_path / "conversations", + ) + yield service + + +async def _start(service: EventService, tmp_path, *agent_turns: str) -> None: + """Start the service and install a scripted agent LLM (one reply per run).""" + (tmp_path / "workspace").mkdir(exist_ok=True) + await service.start() + service.get_conversation().switch_llm(_scripted(*agent_turns, usage_id="agent")) + + +_DONE = '{"score": 1.0, "complete": true, "missing": ""}' +_NOT_DONE = '{"score": 0.2, "complete": false, "missing": "tests"}' + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("agent_turns", "verdicts", "max_iterations", "status", "iterations"), + [ + (("turn 1", "turn 2"), (_NOT_DONE, _DONE), 5, "complete", 2), + (("turn 1", "turn 2"), (_NOT_DONE, _NOT_DONE), 2, "capped", 2), + ], + ids=["completes-after-two-rounds", "caps-at-max"], +) +async def test_goal_outcomes( + event_service, tmp_path, agent_turns, verdicts, max_iterations, status, iterations +): + await _start(event_service, tmp_path, *agent_turns) + judge = _scripted(*verdicts, usage_id="judge") + try: + await event_service.start_goal( + "build x", judge_llm=judge, max_iterations=max_iterations + ) + await asyncio.wait_for(event_service._goal_task, timeout=15) + + outcome = event_service._goal_outcome + assert outcome is not None + assert outcome.status == status + assert outcome.iterations == iterations + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_goal_emits_status_events(event_service, tmp_path): + # The loop publishes ConversationStateUpdateEvent(key="goal") at each + # lifecycle point; they are persisted to the shared event log (and streamed + # to subscribers) so a UI can render a progress chip. + await _start(event_service, tmp_path, "did the work") + judge = _scripted( + '{"score": 1.0, "complete": true, "missing": ""}', usage_id="judge" + ) + try: + await event_service.start_goal("build x", judge_llm=judge, max_iterations=3) + await asyncio.wait_for(event_service._goal_task, timeout=15) + + updates = [ + e.value + for e in event_service.get_conversation()._state.events + if isinstance(e, ConversationStateUpdateEvent) and e.key == "goal" + ] + assert updates, "expected goal-status events" + # First update: running + active. Last: complete + inactive. + assert updates[0]["active"] is True + assert updates[0]["status"] == "running" + assert updates[-1]["active"] is False + assert updates[-1]["status"] == "complete" + assert updates[-1]["iteration"] == 1 + assert updates[-1]["objective"] == "build x" + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_goal_defaults_judge_to_agent_llm(event_service, tmp_path): + # No judge_llm passed -> the agent's own LLM is used as the judge, so its + # scripted queue serves both the agent turn and the verdict. This is the + # path the POST /goal endpoint always takes. + await _start( + event_service, + tmp_path, + "did the work", + '{"score": 1.0, "complete": true, "missing": ""}', + ) + try: + await event_service.start_goal("build x", max_iterations=3) + await asyncio.wait_for(event_service._goal_task, timeout=15) + + outcome = event_service._goal_outcome + assert outcome is not None + assert outcome.status == "complete" + assert outcome.iterations == 1 + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_start_goal_rejects_empty_objective(event_service, tmp_path): + await _start(event_service, tmp_path, "noop") + judge = _scripted("{}", usage_id="judge") + try: + with pytest.raises(ValueError): + await event_service.start_goal(" ", judge_llm=judge) + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_start_goal_rejects_concurrent_goal(event_service, tmp_path): + await _start(event_service, tmp_path, "noop") + judge = _scripted("{}", usage_id="judge") + try: + # Occupy the goal slot with a task that won't finish on its own. + event_service._goal_task = asyncio.create_task(asyncio.sleep(10)) + with pytest.raises(ValueError, match="goal_already_running"): + await event_service.start_goal("build x", judge_llm=judge) + finally: + event_service._goal_task.cancel() + event_service._goal_task = None + await event_service.close() + + +@pytest.mark.asyncio +async def test_stop_goal_when_idle_returns_false(event_service, tmp_path): + await _start(event_service, tmp_path, "noop") + try: + assert await event_service.stop_goal() is False + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_user_message_stops_running_goal(event_service, tmp_path): + # A user message (the normal chat path, _from_goal=False) cancels a running + # goal loop before being processed. + await _start(event_service, tmp_path, "noop") + try: + event_service._goal_task = asyncio.create_task(asyncio.sleep(30)) + await event_service.send_message( + Message(role="user", content=[TextContent(text="hello")]), run=False + ) + assert event_service._goal_task.done() + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_stop_running_goal_emits_interrupted(event_service, tmp_path): + await _start(event_service, tmp_path, "did the work") + judge = cast( + _GatedLLM, + _GatedLLM.from_messages( + [ + Message( + role="assistant", + content=[TextContent(text='{"score": 0.2, "complete": false}')], + ) + ], + usage_id="judge", + ), + ) + try: + await event_service.start_goal("build x", judge_llm=judge, max_iterations=5) + # Wait until the judge is blocked: the goal is mid-audit, no run in flight. + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, judge._entered.wait, 5.0) + + assert await event_service.stop_goal() is True + updates = _goal_status_updates(event_service) + assert updates[-1]["status"] == "interrupted" + assert updates[-1]["active"] is False + assert updates[-1]["iteration"] == 1 + finally: + judge._gate.set() # release the orphaned judge thread for cleanup + await event_service.close() + + +@pytest.mark.asyncio +async def test_resume_from_interrupted_status(event_service, tmp_path): + await _start(event_service, tmp_path, "resumed and finished") + # Simulate a goal that was interrupted at round 1 of 5. + conversation = event_service.get_conversation() + with conversation._state: + conversation._on_event( + ConversationStateUpdateEvent( + key="goal", + value={ + "active": False, + "status": "interrupted", + "iteration": 1, + "max_iterations": 5, + "objective": "build x", + "verdict": None, + }, + ) + ) + judge = _scripted( + '{"score": 1.0, "complete": true, "missing": ""}', usage_id="judge" + ) + try: + await event_service.resume_goal(judge_llm=judge) + await asyncio.wait_for(event_service._goal_task, timeout=15) + + outcome = event_service._goal_outcome + assert outcome is not None + assert outcome.status == "complete" + assert outcome.iterations == 2 # resumed from round 1 -> completed at round 2 + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_resume_without_resumable_goal_raises(event_service, tmp_path): + await _start(event_service, tmp_path, "noop") + judge = _scripted("{}", usage_id="judge") + try: + with pytest.raises(ValueError, match="no_resumable_goal"): + await event_service.resume_goal(judge_llm=judge) + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_resume_after_completed_goal_raises(event_service, tmp_path): + # A completed (or capped) goal is not resumable. + await _start(event_service, tmp_path, "noop") + conversation = event_service.get_conversation() + with conversation._state: + conversation._on_event( + ConversationStateUpdateEvent( + key="goal", + value={ + "active": False, + "status": "complete", + "iteration": 2, + "max_iterations": 5, + "objective": "build x", + "verdict": None, + }, + ) + ) + judge = _scripted("{}", usage_id="judge") + try: + with pytest.raises(ValueError, match="no_resumable_goal"): + await event_service.resume_goal(judge_llm=judge) + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_goal_halts_on_run_error_as_interrupted(event_service, tmp_path): + # Simulate "out of credits": the agent's run raises. The goal must record an + # interrupted (resumable) status, not die silently with no outcome. + (tmp_path / "workspace").mkdir(exist_ok=True) + await event_service.start() + event_service.get_conversation().switch_llm( + TestLLM.from_messages([RuntimeError("out of credits")], usage_id="agent") + ) + judge = _scripted( + '{"score": 0.1, "complete": false, "missing": "x"}', usage_id="judge" + ) + try: + await event_service.start_goal("build x", judge_llm=judge, max_iterations=5) + await asyncio.wait_for(event_service._goal_task, timeout=15) + + updates = _goal_status_updates(event_service) + assert updates[-1]["status"] == "interrupted" + assert updates[-1]["active"] is False + assert event_service._goal_outcome is None + finally: + await event_service.close() From 2e37bea94696664b1f48cdf8191f98af40df8584 Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Wed, 17 Jun 2026 14:58:55 +0200 Subject: [PATCH 3/9] =?UTF-8?q?chore(sdk):=20tidy=20/goal=20core=20?= =?UTF-8?q?=E2=80=94=20temp-dir=20demo=20persistence,=20drop=20unneeded=20?= =?UTF-8?q?future=20imports?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .pr/goal_shared_history.py | 4 +--- openhands-sdk/openhands/sdk/conversation/goal/controller.py | 2 -- openhands-sdk/openhands/sdk/conversation/goal/judge.py | 2 -- openhands-sdk/openhands/sdk/conversation/goal/prompts.py | 2 -- 4 files changed, 1 insertion(+), 9 deletions(-) diff --git a/.pr/goal_shared_history.py b/.pr/goal_shared_history.py index 9dc6495847..a0aae59ea5 100644 --- a/.pr/goal_shared_history.py +++ b/.pr/goal_shared_history.py @@ -113,10 +113,8 @@ def main() -> None: agent, judge_llm, main_message, objective, max_iters = build(real) workspace = tempfile.mkdtemp(prefix="goal-demo-") # visualizer=None keeps the output focused on the proof below. - from pathlib import Path - conversation = Conversation( - agent=agent, workspace=workspace, visualizer=None, persistence_dir=Path.cwd() + agent=agent, workspace=workspace, visualizer=None, persistence_dir=workspace ) convo_id = conversation.id diff --git a/openhands-sdk/openhands/sdk/conversation/goal/controller.py b/openhands-sdk/openhands/sdk/conversation/goal/controller.py index c2bae7140b..3ea92524bf 100644 --- a/openhands-sdk/openhands/sdk/conversation/goal/controller.py +++ b/openhands-sdk/openhands/sdk/conversation/goal/controller.py @@ -7,8 +7,6 @@ That split lets the sync and async drivers share identical decision logic. """ -from __future__ import annotations - from collections.abc import Sequence from typing import Literal diff --git a/openhands-sdk/openhands/sdk/conversation/goal/judge.py b/openhands-sdk/openhands/sdk/conversation/goal/judge.py index 82b34d1a05..9bdc73a009 100644 --- a/openhands-sdk/openhands/sdk/conversation/goal/judge.py +++ b/openhands-sdk/openhands/sdk/conversation/goal/judge.py @@ -6,8 +6,6 @@ can equally back a status command, a stop hook, or a server endpoint. """ -from __future__ import annotations - import contextlib import json import re diff --git a/openhands-sdk/openhands/sdk/conversation/goal/prompts.py b/openhands-sdk/openhands/sdk/conversation/goal/prompts.py index 7973ce8a6a..c24bbd2214 100644 --- a/openhands-sdk/openhands/sdk/conversation/goal/prompts.py +++ b/openhands-sdk/openhands/sdk/conversation/goal/prompts.py @@ -1,7 +1,5 @@ """Prompt text for the ``/goal`` command's judge and continuation messages.""" -from __future__ import annotations - from typing import Final From c96ebb4401c618b272cad621d7801325dd1be41b Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Thu, 18 Jun 2026 10:01:49 +0200 Subject: [PATCH 4/9] feat(goal): carry per-round judge verdict on goal status events --- .../openhands/agent_server/event_service.py | 4 ++- .../sdk/conversation/goal/controller.py | 5 +++- tests/agent_server/test_goal_loop.py | 26 +++++++++++++++++++ .../sdk/conversation/goal/test_controller.py | 5 ++++ 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 79fd969a91..0613682c3f 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -1132,7 +1132,9 @@ def _persist() -> None: step.outcome.iterations, ) return - await _emit_status(active=True, status="running") + # Carry the round's verdict so a UI can show per-round judge + # feedback (score + what's missing), not just the final one. + await _emit_status(active=True, status="running", verdict=step.verdict) await self.send_message( _user(step.followup), run=False, _from_goal=True ) diff --git a/openhands-sdk/openhands/sdk/conversation/goal/controller.py b/openhands-sdk/openhands/sdk/conversation/goal/controller.py index 3ea92524bf..6bc367dc7d 100644 --- a/openhands-sdk/openhands/sdk/conversation/goal/controller.py +++ b/openhands-sdk/openhands/sdk/conversation/goal/controller.py @@ -60,6 +60,9 @@ class GoalContinue(BaseModel): """Decision to keep going: send ``followup`` before the next run.""" followup: str + verdict: GoalVerdict = Field( + description="The judge's verdict for the round that just finished." + ) class GoalDone(BaseModel): @@ -127,4 +130,4 @@ def on_run_finished(self, events: Sequence[Event]) -> GoalStep: ) missing = verdict.missing or "Some requirements are not yet verified." followup = FOLLOWUP_PROMPT.format(iteration=self.iteration, missing=missing) - return GoalContinue(followup=followup) + return GoalContinue(followup=followup, verdict=verdict) diff --git a/tests/agent_server/test_goal_loop.py b/tests/agent_server/test_goal_loop.py index 85ec07f961..7d3e6d97bb 100644 --- a/tests/agent_server/test_goal_loop.py +++ b/tests/agent_server/test_goal_loop.py @@ -157,6 +157,32 @@ async def test_goal_emits_status_events(event_service, tmp_path): await event_service.close() +@pytest.mark.asyncio +async def test_goal_emits_per_round_verdicts(event_service, tmp_path): + # Each continuing round publishes its judge verdict (score + missing) on the + # running status event, so a UI can show per-round feedback, not just the + # terminal verdict. + await _start(event_service, tmp_path, "turn 1", "turn 2") + judge = _scripted(_NOT_DONE, _DONE, usage_id="judge") + try: + await event_service.start_goal("build x", judge_llm=judge, max_iterations=5) + await asyncio.wait_for(event_service._goal_task, timeout=15) + + updates = _goal_status_updates(event_service) + # The kickoff update (iteration 0) has no verdict yet. + kickoff = next(u for u in updates if u["iteration"] == 0) + assert kickoff["verdict"] is None + # The mid-loop "running" update for round 1 carries the round's verdict. + round_one = next( + u for u in updates if u["status"] == "running" and u["iteration"] == 1 + ) + assert round_one["verdict"] is not None + assert round_one["verdict"]["score"] == 0.2 + assert round_one["verdict"]["missing"] == "tests" + finally: + await event_service.close() + + @pytest.mark.asyncio async def test_goal_defaults_judge_to_agent_llm(event_service, tmp_path): # No judge_llm passed -> the agent's own LLM is used as the judge, so its diff --git a/tests/sdk/conversation/goal/test_controller.py b/tests/sdk/conversation/goal/test_controller.py index 49b60cd977..c99f4c9d70 100644 --- a/tests/sdk/conversation/goal/test_controller.py +++ b/tests/sdk/conversation/goal/test_controller.py @@ -34,6 +34,11 @@ def test_continue_when_incomplete(): assert isinstance(step, GoalContinue) assert "tests" in step.followup assert controller.iteration == 1 + # The verdict for the round just finished rides along so a driver can + # publish per-round judge feedback, not just the terminal one. + assert step.verdict.score == 0.2 + assert step.verdict.complete is False + assert step.verdict.missing == "tests" def test_done_when_complete(): From a7f463b08b76b1727faa337027cac2336fb08570 Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Thu, 18 Jun 2026 13:55:02 +0200 Subject: [PATCH 5/9] fix(agent-server): emit interrupted status when the goal loop hits an unexpected error --- .../openhands/agent_server/event_service.py | 6 +++++ tests/agent_server/test_goal_loop.py | 23 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 0613682c3f..fa08eb44fb 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -1148,6 +1148,12 @@ def _persist() -> None: raise except Exception: logger.exception("Goal loop failed") + # An unexpected failure (judge LLM error, controller bug, ...) leaves + # the loop dead: record an interrupted status (resumable) so the UI + # doesn't show it running. Skip during close(), like the cancel path. + if not self._closing: + with suppress(Exception): + await _emit_status(active=False, status="interrupted") finally: self._goal_task = None diff --git a/tests/agent_server/test_goal_loop.py b/tests/agent_server/test_goal_loop.py index 7d3e6d97bb..ddccffbadc 100644 --- a/tests/agent_server/test_goal_loop.py +++ b/tests/agent_server/test_goal_loop.py @@ -381,3 +381,26 @@ async def test_goal_halts_on_run_error_as_interrupted(event_service, tmp_path): assert event_service._goal_outcome is None finally: await event_service.close() + + +@pytest.mark.asyncio +async def test_goal_emits_interrupted_on_unexpected_error(event_service, tmp_path): + # A judge LLM that *raises* (e.g. a network error) crashes the loop via the + # generic `except Exception` path -- distinct from a run error surfaced as + # ConversationExecutionStatus.ERROR (test above). The loop must still record + # a terminal interrupted (resumable) status; otherwise the last persisted + # event stays active=True/running and the UI shows a dead goal as running. + await _start(event_service, tmp_path, "did the work") + judge = TestLLM.from_messages( + [RuntimeError("judge network error")], usage_id="judge" + ) + try: + await event_service.start_goal("build x", judge_llm=judge, max_iterations=5) + await asyncio.wait_for(event_service._goal_task, timeout=15) + + updates = _goal_status_updates(event_service) + assert updates[-1]["status"] == "interrupted" + assert updates[-1]["active"] is False + assert event_service._goal_outcome is None + finally: + await event_service.close() From 2d4dae6457e363d73abae758b6cd4f76ce4525db Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Thu, 18 Jun 2026 14:53:39 +0200 Subject: [PATCH 6/9] fix(agent-server): refuse /goal when a conversation run is already in flight --- .../openhands/agent_server/event_service.py | 47 +++++++++++--- tests/agent_server/test_goal_loop.py | 65 +++++++++++++++++++ 2 files changed, 104 insertions(+), 8 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index fa08eb44fb..1bf5f3460f 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -1034,16 +1034,34 @@ async def start_goal( """ if not self._conversation or self._closing: raise ValueError("inactive_service") - if self._goal_task is not None and not self._goal_task.done(): - raise ValueError("goal_already_running") if judge_llm is None: judge_llm = getattr(self._conversation.agent, "llm", None) if judge_llm is None: raise ValueError("no_judge_llm") # GoalController validates the objective/max_iterations (raises ValueError). controller = GoalController(objective, judge_llm, max_iterations=max_iterations) - self._goal_outcome = None - self._goal_task = asyncio.create_task(self._run_goal(controller)) + # Under _run_lock, atomically refuse a concurrent goal or an in-flight run + # (mirrors run()'s guard); else /goal slips in beside the active run and + # ends up judging that unrelated transcript. + async with self._run_lock: + if self._closing: + raise ValueError("inactive_service") + if self._goal_task is not None and not self._goal_task.done(): + raise ValueError("goal_already_running") + # _run_task first: a live run holds the state lock across its step, + # so reading execution status would block behind it. + if (self._run_task is not None and not self._run_task.done()) or ( + await self._get_execution_status() + == ConversationExecutionStatus.RUNNING + ): + raise ValueError("conversation_already_running") + # Re-check after the await above: close() runs without _run_lock, so + # it may have begun teardown meanwhile (mirrors run()'s post-status + # _closing re-check) -- avoid spawning a task close() won't cancel. + if self._closing: + raise ValueError("inactive_service") + self._goal_outcome = None + self._goal_task = asyncio.create_task(self._run_goal(controller)) async def _run_goal( self, controller: GoalController, *, resume: bool = False @@ -1201,8 +1219,6 @@ async def resume_goal( """ if not self._conversation or self._closing: raise ValueError("inactive_service") - if self._goal_task is not None and not self._goal_task.done(): - raise ValueError("goal_already_running") loop = asyncio.get_running_loop() last = await loop.run_in_executor(None, self._last_goal_status) if last is None or last.get("status") in ("complete", "capped"): @@ -1217,8 +1233,23 @@ async def resume_goal( max_iterations=max_iterations or int(last["max_iterations"]), ) controller.iteration = int(last["iteration"]) - self._goal_outcome = None - self._goal_task = asyncio.create_task(self._run_goal(controller, resume=True)) + # Same busy-guard as start_goal: refuse a concurrent goal or in-flight run. + async with self._run_lock: + if self._closing: + raise ValueError("inactive_service") + if self._goal_task is not None and not self._goal_task.done(): + raise ValueError("goal_already_running") + if (self._run_task is not None and not self._run_task.done()) or ( + await self._get_execution_status() + == ConversationExecutionStatus.RUNNING + ): + raise ValueError("conversation_already_running") + if self._closing: # see start_goal: close() may have begun teardown + raise ValueError("inactive_service") + self._goal_outcome = None + self._goal_task = asyncio.create_task( + self._run_goal(controller, resume=True) + ) async def respond_to_confirmation(self, request: ConfirmationResponseRequest): if request.accept: diff --git a/tests/agent_server/test_goal_loop.py b/tests/agent_server/test_goal_loop.py index ddccffbadc..949b0cdcfe 100644 --- a/tests/agent_server/test_goal_loop.py +++ b/tests/agent_server/test_goal_loop.py @@ -404,3 +404,68 @@ async def test_goal_emits_interrupted_on_unexpected_error(event_service, tmp_pat assert event_service._goal_outcome is None finally: await event_service.close() + + +@pytest.mark.asyncio +async def test_start_goal_rejected_while_run_active(event_service, tmp_path): + # /goal must refuse with conversation_already_running (-> 409) when a normal + # run is already in flight, instead of slipping in beside it and judging that + # run's unrelated transcript. Uses a real gated run, not a placeholder task. + (tmp_path / "workspace").mkdir(exist_ok=True) + await event_service.start() + agent_llm = cast( + _GatedLLM, + _GatedLLM.from_messages( + [Message(role="assistant", content=[TextContent(text="working")])], + usage_id="agent", + ), + ) + event_service.get_conversation().switch_llm(agent_llm) + judge = _scripted("{}", usage_id="judge") + try: + # Kick off a real background run and wait until the agent is blocked + # mid-LLM-call, so the run is genuinely in flight. + await event_service.send_message( + Message(role="user", content=[TextContent(text="do a thing")]), run=True + ) + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, agent_llm._entered.wait, 5.0) + + with pytest.raises(ValueError, match="conversation_already_running"): + await event_service.start_goal("build x", judge_llm=judge) + assert event_service._goal_task is None + finally: + agent_llm._gate.set() # release the blocked run for cleanup + await event_service.close() + + +@pytest.mark.asyncio +async def test_resume_goal_rejected_while_run_active(event_service, tmp_path): + # resume_goal shares start_goal's guard. A placeholder _run_task stands in for + # an active run: a real gated run would hold the state lock that resume_goal's + # _last_goal_status() read needs first, which is a separate concern. + await _start(event_service, tmp_path, "noop") + conversation = event_service.get_conversation() + with conversation._state: + conversation._on_event( + ConversationStateUpdateEvent( + key="goal", + value={ + "active": False, + "status": "interrupted", + "iteration": 1, + "max_iterations": 5, + "objective": "build x", + "verdict": None, + }, + ) + ) + judge = _scripted("{}", usage_id="judge") + try: + event_service._run_task = asyncio.create_task(asyncio.sleep(10)) + with pytest.raises(ValueError, match="conversation_already_running"): + await event_service.resume_goal(judge_llm=judge) + finally: + event_service._run_task.cancel() + event_service._run_task = None + await event_service.close() From 719521194dd23c60e9145d864b58488001004f5b Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Thu, 18 Jun 2026 15:05:18 +0200 Subject: [PATCH 7/9] fix(agent-server): refuse /goal when a conversation run is already in flight --- tests/agent_server/test_goal_loop.py | 30 +++++++--------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/tests/agent_server/test_goal_loop.py b/tests/agent_server/test_goal_loop.py index 949b0cdcfe..5a5f5f5803 100644 --- a/tests/agent_server/test_goal_loop.py +++ b/tests/agent_server/test_goal_loop.py @@ -410,40 +410,24 @@ async def test_goal_emits_interrupted_on_unexpected_error(event_service, tmp_pat async def test_start_goal_rejected_while_run_active(event_service, tmp_path): # /goal must refuse with conversation_already_running (-> 409) when a normal # run is already in flight, instead of slipping in beside it and judging that - # run's unrelated transcript. Uses a real gated run, not a placeholder task. - (tmp_path / "workspace").mkdir(exist_ok=True) - await event_service.start() - agent_llm = cast( - _GatedLLM, - _GatedLLM.from_messages( - [Message(role="assistant", content=[TextContent(text="working")])], - usage_id="agent", - ), - ) - event_service.get_conversation().switch_llm(agent_llm) + # run's unrelated transcript. A placeholder _run_task stands in for the active + # run (same pattern as test_start_goal_rejects_concurrent_goal). + await _start(event_service, tmp_path, "noop") judge = _scripted("{}", usage_id="judge") try: - # Kick off a real background run and wait until the agent is blocked - # mid-LLM-call, so the run is genuinely in flight. - await event_service.send_message( - Message(role="user", content=[TextContent(text="do a thing")]), run=True - ) - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, agent_llm._entered.wait, 5.0) - + event_service._run_task = asyncio.create_task(asyncio.sleep(10)) with pytest.raises(ValueError, match="conversation_already_running"): await event_service.start_goal("build x", judge_llm=judge) assert event_service._goal_task is None finally: - agent_llm._gate.set() # release the blocked run for cleanup + event_service._run_task.cancel() + event_service._run_task = None await event_service.close() @pytest.mark.asyncio async def test_resume_goal_rejected_while_run_active(event_service, tmp_path): - # resume_goal shares start_goal's guard. A placeholder _run_task stands in for - # an active run: a real gated run would hold the state lock that resume_goal's - # _last_goal_status() read needs first, which is a separate concern. + # resume_goal shares start_goal's guard; same placeholder _run_task approach. await _start(event_service, tmp_path, "noop") conversation = event_service.get_conversation() with conversation._state: From e04175416dcb3c2e46a72593b7ac9438ed675927 Mon Sep 17 00:00:00 2001 From: allhands-bot Date: Thu, 18 Jun 2026 13:23:23 +0000 Subject: [PATCH 8/9] chore: Remove PR-only artifacts [automated] --- .pr/README.md | 64 --------------- .pr/goal_shared_history.py | 164 ------------------------------------- 2 files changed, 228 deletions(-) delete mode 100644 .pr/README.md delete mode 100644 .pr/goal_shared_history.py diff --git a/.pr/README.md b/.pr/README.md deleted file mode 100644 index 6bc2ecf9ff..0000000000 --- a/.pr/README.md +++ /dev/null @@ -1,64 +0,0 @@ -# `/goal` shared-history demo - -Proves that the `/goal` loop writes into the **same** conversation history as the -main chat — it drives the `Conversation` you pass in, it does **not** fork or -create a sidecar conversation. - -## Run - -```bash -# Deterministic, no network (scripted TestLLMs) — always works: -uv run python .pr/goal_shared_history.py - -# Real agent doing real work (creates files, runs pytest) — opt in explicitly: -GOAL_DEMO_REAL=1 LLM_API_KEY=sk-... LLM_MODEL=gpt-5.5 \ - uv run python .pr/goal_shared_history.py -``` - -## What to look for - -The script sends a normal "main conversation" message, then runs `run_goal(...)` -on the **same** `Conversation`. The `PROOF` section at the end shows: - -``` -same conversation id .............. True -only one Conversation object ...... True (no fork was created) -event log GREW in place ........... 3 -> 7 -main-convo events still present ... True -goal objective is in THIS log ..... True -goal outcome ...................... complete (after 2 round(s)) -``` - -i.e. the goal's objective, the agent's work, the judge-driven followups, and the -completion are all appended to the **one** `conversation.state.events` log under -the **one** `conversation.id` — alongside (not replacing) the main-convo events. - -## Seeing what the LLM is doing - -The demo passes `visualizer=None` to keep the proof output clean. To watch the -agent's activity: - -- **Live**: drop `visualizer=None` (the default is `DefaultConversationVisualizer`), - and every event — messages, tool calls, observations — prints as it happens. -- **After the fact**: the script ends with a `REPLAY` section that renders the - saved history through the visualizer. Because every turn is persisted in - `conversation.state.events`, you can replay it any time: - - ```python - from openhands.sdk.conversation.visualizer import DefaultConversationVisualizer - viz = DefaultConversationVisualizer() - for event in conversation.state.events: - viz.on_event(event) - ``` - -In the deterministic (no-key) run the agent only emits scripted text, so you see -messages. In real mode (`GOAL_DEMO_REAL=1`) you also see the actual terminal -commands, file edits, and `pytest` output the agent runs. - -## How this maps to the agent server - -`run_goal` (used here) and the agent server's `EventService.start_goal` use the -same mechanism: they drive a single `Conversation`/`_conversation`, so every -event lands in that conversation's shared log and streams to subscribers. A -`POST /conversations/{id}/goal` endpoint runs the loop in the background on the -**existing** conversation — same history as the main chat. diff --git a/.pr/goal_shared_history.py b/.pr/goal_shared_history.py deleted file mode 100644 index a0aae59ea5..0000000000 --- a/.pr/goal_shared_history.py +++ /dev/null @@ -1,164 +0,0 @@ -"""Runnable proof that the ``/goal`` loop writes into the SAME conversation history. - -What it does: - 1. Sends a normal "main conversation" message and runs the agent. - 2. Runs a ``/goal`` loop on the *same* ``Conversation`` object. - 3. Prints the single shared event log and checks that the main-conversation - events are still there, untouched, with the goal's objective / agent work / - judge-driven followups / completion appended after them. - -The point: ``run_goal`` drives the conversation you pass in (it does not fork or -spin up a sidecar), so everything lands in one ``conversation.state.events`` log -under one ``conversation.id``. The agent-server ``EventService.start_goal`` uses -the same mechanism on its single ``_conversation``, so this proves the property -both paths rely on. - -Run it two ways: - # Deterministic, no network (scripted TestLLMs) -- always works, quick check: - uv run python .pr/goal_shared_history.py - - # Real agent doing real work (creates files, runs pytest) -- opt in explicitly: - GOAL_DEMO_REAL=1 LLM_API_KEY=sk-... LLM_MODEL=gpt-5.5 \ - uv run python .pr/goal_shared_history.py -""" - -import os -import tempfile - -from openhands.sdk import LLM, Agent, Conversation, Tool -from openhands.sdk.conversation.goal import run_goal -from openhands.sdk.conversation.visualizer import DefaultConversationVisualizer -from openhands.sdk.event import LLMConvertibleEvent -from openhands.sdk.llm import Message, TextContent, content_to_str -from openhands.sdk.testing import TestLLM -from openhands.tools.file_editor import FileEditorTool -from openhands.tools.terminal import TerminalTool - - -def dump_history(conversation, title: str) -> list: - """Print the conversation's full event log and return its events.""" - events = list(conversation.state.events) - print(f"\n===== {title} =====") - print(f"conversation id : {conversation.id}") - print(f"total events : {len(events)}") - for i, ev in enumerate(events): - if isinstance(ev, LLMConvertibleEvent): - text = " ".join(content_to_str(ev.to_llm_message().content)) - text = text.strip().replace("\n", " ") - print(f" [{i:>2}] {ev.to_llm_message().role:<9} {text[:96]}") - else: - print(f" [{i:>2}] {type(ev).__name__}") - return events - - -def _scripted(*texts: str, usage_id: str) -> TestLLM: - return TestLLM.from_messages( - [Message(role="assistant", content=[TextContent(text=t)]) for t in texts], - usage_id=usage_id, - ) - - -def build(real: bool): - """Return (agent, judge_llm, main_message, objective, max_iterations).""" - if real: - llm = LLM( - usage_id="agent", - model=os.getenv("LLM_MODEL", "gpt-5.5"), - api_key=os.getenv("LLM_API_KEY"), - base_url=os.getenv("LLM_BASE_URL"), - ) - agent = Agent( - llm=llm, - tools=[Tool(name=TerminalTool.name), Tool(name=FileEditorTool.name)], - ) - judge_llm = llm.model_copy(update={"usage_id": "goal-judge"}) - objective = ( - "Create mathx.py with an add(a, b) function and test_mathx.py with a " - "pytest test for it. The goal is complete only when `python -m pytest " - "-q` passes. Finish each turn with the finish tool." - ) - return ( - agent, - judge_llm, - "Say hello and tell me which directory you are in.", - objective, - 5, - ) - - # Deterministic path: scripted agent (one content-only reply per run) + a - # judge that says "not done" once, then "done". - agent = Agent( - llm=_scripted( - "Hello! I am working in the demo workspace.", # main turn - "I drafted mathx.py and a pytest for it.", # goal round 1 - "Fixed it -- mathx.py and test_mathx.py now pass.", # goal round 2 - usage_id="agent", - ), - tools=[], - ) - judge_llm = _scripted( - '{"score": 0.3, "complete": false, "missing": "tests not passing yet"}', - '{"score": 1.0, "complete": true, "missing": ""}', - usage_id="goal-judge", - ) - return agent, judge_llm, "Say hello.", "Make `pytest` pass for mathx.py.", 5 - - -def main() -> None: - # Real mode is explicit opt-in so the deterministic demo always works, - # even when a (possibly stale) LLM_API_KEY is present in the environment. - real = os.getenv("GOAL_DEMO_REAL") == "1" - print(f"mode: {'REAL LLM' if real else 'DETERMINISTIC (scripted TestLLM)'}") - - agent, judge_llm, main_message, objective, max_iters = build(real) - workspace = tempfile.mkdtemp(prefix="goal-demo-") - # visualizer=None keeps the output focused on the proof below. - conversation = Conversation( - agent=agent, workspace=workspace, visualizer=None, persistence_dir=workspace - ) - convo_id = conversation.id - - # 1) A normal "main conversation" turn. - conversation.send_message(main_message) - conversation.run() - main_events = dump_history(conversation, "AFTER MAIN CONVERSATION TURN") - main_ids = [ev.id for ev in main_events] - - # 2) A /goal loop on the SAME conversation object. - print(f"\n>>> running /goal: {objective}\n") - outcome = run_goal(conversation, objective, judge_llm, max_iterations=max_iters) - - all_events = dump_history(conversation, "AFTER /goal LOOP (SAME CONVERSATION)") - all_ids = [ev.id for ev in all_events] - - # 3) Prove it is one shared history. - objective_in_log = any( - objective[:20] in " ".join(content_to_str(ev.to_llm_message().content)) - for ev in all_events - if isinstance(ev, LLMConvertibleEvent) - ) - print("\n===== PROOF (shared history) =====") - print(f"same conversation id .............. {conversation.id == convo_id}") - print("only one Conversation object ...... True (no fork was created)") - print(f"event log GREW in place ........... {len(main_ids)} -> {len(all_ids)}") - print(f"main-convo events still present ... {all_ids[: len(main_ids)] == main_ids}") - print(f"goal objective is in THIS log ..... {objective_in_log}") - print( - f"goal outcome ...................... {outcome.status} " - f"(after {outcome.iterations} round(s))" - ) - print(f"\nworkspace: {workspace}") - - # Visualize the whole thing AFTER the fact. Because every turn (main + goal) - # is persisted in conversation.state.events, we can replay the conversation - # through the SDK's visualizer at any time -- here, after the run finished. - # (For LIVE output instead, drop `visualizer=None` above; the default - # DefaultConversationVisualizer then prints each event as it happens.) - print("\n===== REPLAY (visualizing the saved conversation) =====") - visualizer = DefaultConversationVisualizer() - for event in conversation.state.events: - visualizer.on_event(event) - - -if __name__ == "__main__": - main() From dc3f8dce022bf0aa364262178491b3b840601d5e Mon Sep 17 00:00:00 2001 From: enyst Date: Thu, 18 Jun 2026 14:38:12 +0000 Subject: [PATCH 9/9] Clarify goal loop naming in agent server Co-authored-by: openhands --- .../agent_server/conversation_router.py | 36 ++--- .../openhands/agent_server/event_service.py | 113 ++++++++-------- .../openhands/agent_server/models.py | 2 +- .../agent_server/test_conversation_router.py | 50 +++---- tests/agent_server/test_goal_loop.py | 124 ++++++++++-------- 5 files changed, 170 insertions(+), 155 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/conversation_router.py b/openhands-agent-server/openhands/agent_server/conversation_router.py index 0b0057a380..4009c930c1 100644 --- a/openhands-agent-server/openhands/agent_server/conversation_router.py +++ b/openhands-agent-server/openhands/agent_server/conversation_router.py @@ -299,26 +299,26 @@ async def run_conversation( "/{conversation_id}/goal", responses={ 404: {"description": "Item not found"}, - 409: {"description": "Conversation or goal is already running"}, + 409: {"description": "Conversation run or goal loop is already running"}, }, ) -async def start_goal_conversation( +async def start_goal_in_conversation( conversation_id: UUID, request: StartGoalRequest, conversation_service: ConversationService = Depends(get_conversation_service), ) -> Success: - """Start a ``/goal`` driver loop in the background on the conversation. + """Start a ``/goal`` loop inside an existing conversation. - Drives the agent toward ``objective``, judging completion after each run and - re-prompting until done or ``max_iterations``. All work lands in the same - conversation history and event stream as the main chat (it is not a fork). + The loop appends messages and starts agent runs in the same conversation + history and event stream as the main chat. It does not create a separate + conversation for the goal or fork the existing one. """ event_service = await conversation_service.get_event_service(conversation_id) if event_service is None: raise HTTPException(status.HTTP_404_NOT_FOUND) try: - await event_service.start_goal( + await event_service.start_goal_loop( request.objective, max_iterations=request.max_iterations ) except ValueError as e: @@ -326,7 +326,7 @@ async def start_goal_conversation( if message in ("conversation_already_running", "goal_already_running"): raise HTTPException( status_code=status.HTTP_409_CONFLICT, - detail="Conversation or goal already running.", + detail="Conversation run or goal loop already running.", ) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) @@ -337,15 +337,19 @@ async def start_goal_conversation( "/{conversation_id}/goal/stop", responses={404: {"description": "Item not found"}}, ) -async def stop_goal_conversation( +async def stop_goal_in_conversation( conversation_id: UUID, conversation_service: ConversationService = Depends(get_conversation_service), ) -> Success: - """Stop a running ``/goal`` loop. The goal records a resumable state.""" + """Stop the active ``/goal`` loop inside this conversation. + + This cancels only the background goal loop, not the conversation itself, and + records an ``interrupted`` goal status so ``/goal/resume`` can continue it. + """ event_service = await conversation_service.get_event_service(conversation_id) if event_service is None: raise HTTPException(status.HTTP_404_NOT_FOUND) - await event_service.stop_goal() + await event_service.stop_goal_loop() return Success() @@ -353,26 +357,26 @@ async def stop_goal_conversation( "/{conversation_id}/goal/resume", responses={ 404: {"description": "Item not found"}, - 409: {"description": "Conversation or goal is already running"}, + 409: {"description": "Conversation run or goal loop is already running"}, }, ) -async def resume_goal_conversation( +async def resume_goal_in_conversation( conversation_id: UUID, conversation_service: ConversationService = Depends(get_conversation_service), ) -> Success: - """Resume a previously interrupted ``/goal`` loop from where it left off.""" + """Resume the last interrupted ``/goal`` loop inside this conversation.""" event_service = await conversation_service.get_event_service(conversation_id) if event_service is None: raise HTTPException(status.HTTP_404_NOT_FOUND) try: - await event_service.resume_goal() + await event_service.resume_goal_loop() except ValueError as e: message = str(e) if message in ("conversation_already_running", "goal_already_running"): raise HTTPException( status_code=status.HTTP_409_CONFLICT, - detail="Conversation or goal already running.", + detail="Conversation run or goal loop already running.", ) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 1bf5f3460f..c99c663892 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -102,9 +102,9 @@ class EventService: _lease_task: asyncio.Task | None = field(default=None, init=False) _external_lease_renewal: bool = field(default=False, init=False) _run_executor: ThreadPoolExecutor | None = field(default=None, init=False) - # Background driver task for an in-progress /goal loop, and its last outcome. - _goal_task: asyncio.Task | None = field(default=None, init=False) - _goal_outcome: GoalOutcome | None = field(default=None, init=False) + # Background task for a /goal loop that is running inside this conversation. + _goal_loop_task: asyncio.Task | None = field(default=None, init=False) + _goal_loop_outcome: GoalOutcome | None = field(default=None, init=False) @property def conversation_dir(self): @@ -461,15 +461,14 @@ async def batch_get_events(self, event_ids: list[str]) -> list[Event | None]: return results async def send_message( - self, message: Message, run: bool = False, _from_goal: bool = False + self, message: Message, run: bool = False, _from_goal_loop: bool = False ): if not self._conversation: raise ValueError("inactive_service") - # A user message takes back control: stop any running /goal loop first - # (it persists an "interrupted" status and can be resumed later). The - # goal loop's own messages pass _from_goal=True to skip this. - if not _from_goal: - await self.stop_goal() + # A normal user message supersedes any active /goal loop in this + # conversation. The goal loop's own messages pass _from_goal_loop=True. + if not _from_goal_loop: + await self.stop_goal_loop() explicit_interrupt_generation = self._explicit_interrupt_generation loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._conversation.send_message, message) @@ -1009,19 +1008,19 @@ async def _run_and_publish(): # Create task but don't await it - runs in background self._run_task = asyncio.create_task(_run_and_publish()) - async def start_goal( + async def start_goal_loop( self, objective: str, *, judge_llm: LLM | None = None, max_iterations: int = 10, ) -> None: - """Start a ``/goal`` driver loop in the background on this conversation. + """Start a ``/goal`` loop inside this conversation. Sends the objective, runs the agent, and judges completion after each run, re-prompting until the goal is done or ``max_iterations`` is - reached. All work lands in this conversation's shared event history and - streams to subscribers, exactly like a normal run -- it is *not* a fork. + reached. All work stays in this conversation's event history and stream, + exactly like a normal run; this does not create another conversation. Args: objective: The goal to pursue and audit against. @@ -1029,8 +1028,8 @@ async def start_goal( max_iterations: Hard cap on audit rounds before giving up. Raises: - ValueError: If the service is inactive, a goal is already running, - no judge LLM is available, or the objective is empty. + ValueError: If the service is inactive, a goal loop is already + running, no judge LLM is available, or the objective is empty. """ if not self._conversation or self._closing: raise ValueError("inactive_service") @@ -1040,13 +1039,12 @@ async def start_goal( raise ValueError("no_judge_llm") # GoalController validates the objective/max_iterations (raises ValueError). controller = GoalController(objective, judge_llm, max_iterations=max_iterations) - # Under _run_lock, atomically refuse a concurrent goal or an in-flight run - # (mirrors run()'s guard); else /goal slips in beside the active run and - # ends up judging that unrelated transcript. + # Under _run_lock, atomically refuse a concurrent goal loop or active + # conversation run; otherwise /goal could judge an unrelated transcript. async with self._run_lock: if self._closing: raise ValueError("inactive_service") - if self._goal_task is not None and not self._goal_task.done(): + if self._goal_loop_task is not None and not self._goal_loop_task.done(): raise ValueError("goal_already_running") # _run_task first: a live run holds the state lock across its step, # so reading execution status would block behind it. @@ -1060,17 +1058,17 @@ async def start_goal( # _closing re-check) -- avoid spawning a task close() won't cancel. if self._closing: raise ValueError("inactive_service") - self._goal_outcome = None - self._goal_task = asyncio.create_task(self._run_goal(controller)) + self._goal_loop_outcome = None + self._goal_loop_task = asyncio.create_task(self._run_goal_loop(controller)) - async def _run_goal( + async def _run_goal_loop( self, controller: GoalController, *, resume: bool = False ) -> None: - """Background driver loop for :meth:`start_goal` / :meth:`resume_goal`. + """Drive one active ``/goal`` loop inside this conversation. - Reuses the SDK's transport-agnostic ``GoalController`` for all decisions; - this method only owns I/O: sending messages, awaiting each background run, - judging off the event loop, and publishing goal-status updates. + Reuses the SDK's transport-agnostic ``GoalController`` for decisions; + this method owns only I/O: sending messages, awaiting each run, judging + off the event loop, and publishing goal-status updates. """ conversation = self._conversation if conversation is None: @@ -1117,7 +1115,7 @@ def _persist() -> None: try: await _emit_status(active=True, status="running") nudge = RESUME_PROMPT if resume else controller.start() - await self.send_message(_user(nudge), run=False, _from_goal=True) + await self.send_message(_user(nudge), run=False, _from_goal_loop=True) while True: try: await self.run() @@ -1138,7 +1136,7 @@ def _persist() -> None: return step = await loop.run_in_executor(None, _snapshot_and_judge) if isinstance(step, GoalDone): - self._goal_outcome = step.outcome + self._goal_loop_outcome = step.outcome await _emit_status( active=False, status=step.outcome.status, @@ -1154,12 +1152,12 @@ def _persist() -> None: # feedback (score + what's missing), not just the final one. await _emit_status(active=True, status="running", verdict=step.verdict) await self.send_message( - _user(step.followup), run=False, _from_goal=True + _user(step.followup), run=False, _from_goal_loop=True ) except asyncio.CancelledError: logger.info("Goal loop cancelled") - # An explicit stop / user interjection: record an interrupted status - # (resumable). Skip during close(), which is tearing the service down. + # Explicit stop or user interjection: record a resumable + # interrupted status, except during service teardown. if not self._closing: with suppress(Exception): await _emit_status(active=False, status="interrupted") @@ -1173,15 +1171,16 @@ def _persist() -> None: with suppress(Exception): await _emit_status(active=False, status="interrupted") finally: - self._goal_task = None + self._goal_loop_task = None - async def stop_goal(self) -> bool: - """Stop a running ``/goal`` loop. Returns True if one was stopped. + async def stop_goal_loop(self) -> bool: + """Cancel the active ``/goal`` loop inside this conversation. - The loop records an ``interrupted`` status before exiting, so it can be - resumed later via :meth:`resume_goal`. + Returns True if a loop was active. Unlike ``interrupt()``, this targets + the background goal loop itself and records an ``interrupted`` status so + :meth:`resume_goal_loop` can continue it later. """ - task = self._goal_task + task = self._goal_loop_task if task is None or task.done(): return False task.cancel() @@ -1189,7 +1188,7 @@ async def stop_goal(self) -> bool: await task return True - def _last_goal_status(self) -> dict | None: + def _last_goal_loop_status(self) -> dict | None: """Return the most recent goal-status payload, or None if there is none.""" conversation = self._conversation if conversation is None: @@ -1203,24 +1202,24 @@ def _last_goal_status(self) -> dict | None: return event.value if isinstance(event.value, dict) else None return None - async def resume_goal( + async def resume_goal_loop( self, *, judge_llm: LLM | None = None, max_iterations: int | None = None ) -> None: - """Resume a previously interrupted (or crashed) ``/goal`` loop. + """Resume the last interrupted ``/goal`` loop in this conversation. - Reconstructs the goal from the last persisted goal-status event and - continues from the iteration it had reached. Works within a session and - across a server restart (goal-status events are persisted). + Reconstructs the loop from the last persisted goal-status event and + continues from the iteration it had reached. This works within a session + and across a server restart because goal-status events are persisted. Raises: - ValueError: If the service is inactive, a goal is already running, - no judge LLM is available, or there is no resumable goal (none - was ever started, or the last one already completed/capped). + ValueError: If the service is inactive, a goal loop is already + running, no judge LLM is available, or there is no resumable goal + loop because none was started or it already completed/capped. """ if not self._conversation or self._closing: raise ValueError("inactive_service") loop = asyncio.get_running_loop() - last = await loop.run_in_executor(None, self._last_goal_status) + last = await loop.run_in_executor(None, self._last_goal_loop_status) if last is None or last.get("status") in ("complete", "capped"): raise ValueError("no_resumable_goal") if judge_llm is None: @@ -1233,22 +1232,22 @@ async def resume_goal( max_iterations=max_iterations or int(last["max_iterations"]), ) controller.iteration = int(last["iteration"]) - # Same busy-guard as start_goal: refuse a concurrent goal or in-flight run. + # Same busy guard as start_goal_loop: refuse a goal loop or active run. async with self._run_lock: if self._closing: raise ValueError("inactive_service") - if self._goal_task is not None and not self._goal_task.done(): + if self._goal_loop_task is not None and not self._goal_loop_task.done(): raise ValueError("goal_already_running") if (self._run_task is not None and not self._run_task.done()) or ( await self._get_execution_status() == ConversationExecutionStatus.RUNNING ): raise ValueError("conversation_already_running") - if self._closing: # see start_goal: close() may have begun teardown + if self._closing: # see start_goal_loop: close() may have begun teardown raise ValueError("inactive_service") - self._goal_outcome = None - self._goal_task = asyncio.create_task( - self._run_goal(controller, resume=True) + self._goal_loop_outcome = None + self._goal_loop_task = asyncio.create_task( + self._run_goal_loop(controller, resume=True) ) async def respond_to_confirmation(self, request: ConfirmationResponseRequest): @@ -1374,11 +1373,11 @@ async def close(self): # Cancel any in-progress /goal loop first so it cannot start a new run # while we drain the current one below. - if self._goal_task is not None and not self._goal_task.done(): - self._goal_task.cancel() + if self._goal_loop_task is not None and not self._goal_loop_task.done(): + self._goal_loop_task.cancel() with suppress(asyncio.CancelledError): - await self._goal_task - self._goal_task = None + await self._goal_loop_task + self._goal_loop_task = None if self._lease_task is not None: self._lease_task.cancel() diff --git a/openhands-agent-server/openhands/agent_server/models.py b/openhands-agent-server/openhands/agent_server/models.py index 334c397cc4..f59b875b9a 100644 --- a/openhands-agent-server/openhands/agent_server/models.py +++ b/openhands-agent-server/openhands/agent_server/models.py @@ -500,7 +500,7 @@ class AskAgentResponse(BaseModel): class StartGoalRequest(BaseModel): - """Payload to start a ``/goal`` driver loop on a conversation.""" + """Payload to start a ``/goal`` loop inside a conversation.""" objective: str = Field(description="The goal objective to pursue and audit.") max_iterations: int = Field( diff --git a/tests/agent_server/test_conversation_router.py b/tests/agent_server/test_conversation_router.py index 88aa2c1df9..924b6a82ce 100644 --- a/tests/agent_server/test_conversation_router.py +++ b/tests/agent_server/test_conversation_router.py @@ -1093,12 +1093,12 @@ def test_run_conversation_not_found( client.app.dependency_overrides.clear() -def test_start_goal_success( +def test_start_goal_in_conversation_success( client, mock_conversation_service, mock_event_service, sample_conversation_id ): - """start_goal endpoint forwards the objective to the event service.""" + """/goal endpoint forwards the objective to the event service.""" mock_conversation_service.get_event_service.return_value = mock_event_service - mock_event_service.start_goal.return_value = None + mock_event_service.start_goal_loop.return_value = None client.app.dependency_overrides[get_conversation_service] = ( lambda: mock_conversation_service @@ -1110,17 +1110,17 @@ def test_start_goal_success( ) assert response.status_code == 200 assert response.json()["success"] is True - mock_event_service.start_goal.assert_awaited_once_with( + mock_event_service.start_goal_loop.assert_awaited_once_with( "build x", max_iterations=10 ) finally: client.app.dependency_overrides.clear() -def test_start_goal_not_found( +def test_start_goal_in_conversation_not_found( client, mock_conversation_service, sample_conversation_id ): - """start_goal returns 404 when the conversation is unknown.""" + """/goal returns 404 when the conversation is unknown.""" mock_conversation_service.get_event_service.return_value = None client.app.dependency_overrides[get_conversation_service] = ( @@ -1136,12 +1136,12 @@ def test_start_goal_not_found( client.app.dependency_overrides.clear() -def test_start_goal_already_running( +def test_start_goal_in_conversation_rejects_busy_loop( client, mock_conversation_service, mock_event_service, sample_conversation_id ): - """start_goal returns 409 when a goal/run is already in progress.""" + """/goal returns 409 when a goal loop or conversation run is active.""" mock_conversation_service.get_event_service.return_value = mock_event_service - mock_event_service.start_goal.side_effect = ValueError("goal_already_running") + mock_event_service.start_goal_loop.side_effect = ValueError("goal_already_running") client.app.dependency_overrides[get_conversation_service] = ( lambda: mock_conversation_service @@ -1156,12 +1156,12 @@ def test_start_goal_already_running( client.app.dependency_overrides.clear() -def test_stop_goal_success( +def test_stop_goal_in_conversation_success( client, mock_conversation_service, mock_event_service, sample_conversation_id ): - """stop_goal endpoint forwards to the event service.""" + """/goal/stop endpoint forwards to the event service.""" mock_conversation_service.get_event_service.return_value = mock_event_service - mock_event_service.stop_goal.return_value = True + mock_event_service.stop_goal_loop.return_value = True client.app.dependency_overrides[get_conversation_service] = ( lambda: mock_conversation_service @@ -1170,13 +1170,15 @@ def test_stop_goal_success( response = client.post(f"/api/conversations/{sample_conversation_id}/goal/stop") assert response.status_code == 200 assert response.json()["success"] is True - mock_event_service.stop_goal.assert_awaited_once() + mock_event_service.stop_goal_loop.assert_awaited_once() finally: client.app.dependency_overrides.clear() -def test_stop_goal_not_found(client, mock_conversation_service, sample_conversation_id): - """stop_goal returns 404 when the conversation is unknown.""" +def test_stop_goal_in_conversation_not_found( + client, mock_conversation_service, sample_conversation_id +): + """/goal/stop returns 404 when the conversation is unknown.""" mock_conversation_service.get_event_service.return_value = None client.app.dependency_overrides[get_conversation_service] = ( @@ -1189,12 +1191,12 @@ def test_stop_goal_not_found(client, mock_conversation_service, sample_conversat client.app.dependency_overrides.clear() -def test_resume_goal_success( +def test_resume_goal_in_conversation_success( client, mock_conversation_service, mock_event_service, sample_conversation_id ): - """resume_goal endpoint forwards to the event service.""" + """/goal/resume endpoint forwards to the event service.""" mock_conversation_service.get_event_service.return_value = mock_event_service - mock_event_service.resume_goal.return_value = None + mock_event_service.resume_goal_loop.return_value = None client.app.dependency_overrides[get_conversation_service] = ( lambda: mock_conversation_service @@ -1205,15 +1207,15 @@ def test_resume_goal_success( ) assert response.status_code == 200 assert response.json()["success"] is True - mock_event_service.resume_goal.assert_awaited_once() + mock_event_service.resume_goal_loop.assert_awaited_once() finally: client.app.dependency_overrides.clear() -def test_resume_goal_not_found( +def test_resume_goal_in_conversation_not_found( client, mock_conversation_service, sample_conversation_id ): - """resume_goal returns 404 when the conversation is unknown.""" + """/goal/resume returns 404 when the conversation is unknown.""" mock_conversation_service.get_event_service.return_value = None client.app.dependency_overrides[get_conversation_service] = ( @@ -1228,12 +1230,12 @@ def test_resume_goal_not_found( client.app.dependency_overrides.clear() -def test_resume_goal_no_resumable( +def test_resume_goal_in_conversation_no_resumable( client, mock_conversation_service, mock_event_service, sample_conversation_id ): - """resume_goal returns 400 when there is nothing to resume.""" + """/goal/resume returns 400 when there is nothing to resume.""" mock_conversation_service.get_event_service.return_value = mock_event_service - mock_event_service.resume_goal.side_effect = ValueError("no_resumable_goal") + mock_event_service.resume_goal_loop.side_effect = ValueError("no_resumable_goal") client.app.dependency_overrides[get_conversation_service] = ( lambda: mock_conversation_service diff --git a/tests/agent_server/test_goal_loop.py b/tests/agent_server/test_goal_loop.py index 5a5f5f5803..ecc7dd00b7 100644 --- a/tests/agent_server/test_goal_loop.py +++ b/tests/agent_server/test_goal_loop.py @@ -108,18 +108,18 @@ async def _start(service: EventService, tmp_path, *agent_turns: str) -> None: ], ids=["completes-after-two-rounds", "caps-at-max"], ) -async def test_goal_outcomes( +async def test_goal_loop_outcomes( event_service, tmp_path, agent_turns, verdicts, max_iterations, status, iterations ): await _start(event_service, tmp_path, *agent_turns) judge = _scripted(*verdicts, usage_id="judge") try: - await event_service.start_goal( + await event_service.start_goal_loop( "build x", judge_llm=judge, max_iterations=max_iterations ) - await asyncio.wait_for(event_service._goal_task, timeout=15) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) - outcome = event_service._goal_outcome + outcome = event_service._goal_loop_outcome assert outcome is not None assert outcome.status == status assert outcome.iterations == iterations @@ -128,7 +128,7 @@ async def test_goal_outcomes( @pytest.mark.asyncio -async def test_goal_emits_status_events(event_service, tmp_path): +async def test_goal_loop_emits_status_events(event_service, tmp_path): # The loop publishes ConversationStateUpdateEvent(key="goal") at each # lifecycle point; they are persisted to the shared event log (and streamed # to subscribers) so a UI can render a progress chip. @@ -137,8 +137,10 @@ async def test_goal_emits_status_events(event_service, tmp_path): '{"score": 1.0, "complete": true, "missing": ""}', usage_id="judge" ) try: - await event_service.start_goal("build x", judge_llm=judge, max_iterations=3) - await asyncio.wait_for(event_service._goal_task, timeout=15) + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=3 + ) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) updates = [ e.value @@ -158,15 +160,17 @@ async def test_goal_emits_status_events(event_service, tmp_path): @pytest.mark.asyncio -async def test_goal_emits_per_round_verdicts(event_service, tmp_path): +async def test_goal_loop_emits_per_round_verdicts(event_service, tmp_path): # Each continuing round publishes its judge verdict (score + missing) on the # running status event, so a UI can show per-round feedback, not just the # terminal verdict. await _start(event_service, tmp_path, "turn 1", "turn 2") judge = _scripted(_NOT_DONE, _DONE, usage_id="judge") try: - await event_service.start_goal("build x", judge_llm=judge, max_iterations=5) - await asyncio.wait_for(event_service._goal_task, timeout=15) + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=5 + ) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) updates = _goal_status_updates(event_service) # The kickoff update (iteration 0) has no verdict yet. @@ -184,7 +188,7 @@ async def test_goal_emits_per_round_verdicts(event_service, tmp_path): @pytest.mark.asyncio -async def test_goal_defaults_judge_to_agent_llm(event_service, tmp_path): +async def test_goal_loop_defaults_judge_to_agent_llm(event_service, tmp_path): # No judge_llm passed -> the agent's own LLM is used as the judge, so its # scripted queue serves both the agent turn and the verdict. This is the # path the POST /goal endpoint always takes. @@ -195,10 +199,10 @@ async def test_goal_defaults_judge_to_agent_llm(event_service, tmp_path): '{"score": 1.0, "complete": true, "missing": ""}', ) try: - await event_service.start_goal("build x", max_iterations=3) - await asyncio.wait_for(event_service._goal_task, timeout=15) + await event_service.start_goal_loop("build x", max_iterations=3) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) - outcome = event_service._goal_outcome + outcome = event_service._goal_loop_outcome assert outcome is not None assert outcome.status == "complete" assert outcome.iterations == 1 @@ -207,57 +211,57 @@ async def test_goal_defaults_judge_to_agent_llm(event_service, tmp_path): @pytest.mark.asyncio -async def test_start_goal_rejects_empty_objective(event_service, tmp_path): +async def test_start_goal_loop_rejects_empty_objective(event_service, tmp_path): await _start(event_service, tmp_path, "noop") judge = _scripted("{}", usage_id="judge") try: with pytest.raises(ValueError): - await event_service.start_goal(" ", judge_llm=judge) + await event_service.start_goal_loop(" ", judge_llm=judge) finally: await event_service.close() @pytest.mark.asyncio -async def test_start_goal_rejects_concurrent_goal(event_service, tmp_path): +async def test_start_goal_loop_rejects_concurrent_goal_loop(event_service, tmp_path): await _start(event_service, tmp_path, "noop") judge = _scripted("{}", usage_id="judge") try: - # Occupy the goal slot with a task that won't finish on its own. - event_service._goal_task = asyncio.create_task(asyncio.sleep(10)) + # Occupy the goal loop slot with a task that won't finish on its own. + event_service._goal_loop_task = asyncio.create_task(asyncio.sleep(10)) with pytest.raises(ValueError, match="goal_already_running"): - await event_service.start_goal("build x", judge_llm=judge) + await event_service.start_goal_loop("build x", judge_llm=judge) finally: - event_service._goal_task.cancel() - event_service._goal_task = None + event_service._goal_loop_task.cancel() + event_service._goal_loop_task = None await event_service.close() @pytest.mark.asyncio -async def test_stop_goal_when_idle_returns_false(event_service, tmp_path): +async def test_stop_goal_loop_when_idle_returns_false(event_service, tmp_path): await _start(event_service, tmp_path, "noop") try: - assert await event_service.stop_goal() is False + assert await event_service.stop_goal_loop() is False finally: await event_service.close() @pytest.mark.asyncio -async def test_user_message_stops_running_goal(event_service, tmp_path): - # A user message (the normal chat path, _from_goal=False) cancels a running - # goal loop before being processed. +async def test_user_message_stops_running_goal_loop(event_service, tmp_path): + # A user message on the normal chat path cancels a running goal loop before + # being processed. await _start(event_service, tmp_path, "noop") try: - event_service._goal_task = asyncio.create_task(asyncio.sleep(30)) + event_service._goal_loop_task = asyncio.create_task(asyncio.sleep(30)) await event_service.send_message( Message(role="user", content=[TextContent(text="hello")]), run=False ) - assert event_service._goal_task.done() + assert event_service._goal_loop_task.done() finally: await event_service.close() @pytest.mark.asyncio -async def test_stop_running_goal_emits_interrupted(event_service, tmp_path): +async def test_stop_running_goal_loop_emits_interrupted(event_service, tmp_path): await _start(event_service, tmp_path, "did the work") judge = cast( _GatedLLM, @@ -272,12 +276,14 @@ async def test_stop_running_goal_emits_interrupted(event_service, tmp_path): ), ) try: - await event_service.start_goal("build x", judge_llm=judge, max_iterations=5) + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=5 + ) # Wait until the judge is blocked: the goal is mid-audit, no run in flight. loop = asyncio.get_running_loop() await loop.run_in_executor(None, judge._entered.wait, 5.0) - assert await event_service.stop_goal() is True + assert await event_service.stop_goal_loop() is True updates = _goal_status_updates(event_service) assert updates[-1]["status"] == "interrupted" assert updates[-1]["active"] is False @@ -290,7 +296,7 @@ async def test_stop_running_goal_emits_interrupted(event_service, tmp_path): @pytest.mark.asyncio async def test_resume_from_interrupted_status(event_service, tmp_path): await _start(event_service, tmp_path, "resumed and finished") - # Simulate a goal that was interrupted at round 1 of 5. + # Simulate a goal loop that was interrupted at round 1 of 5. conversation = event_service.get_conversation() with conversation._state: conversation._on_event( @@ -310,10 +316,10 @@ async def test_resume_from_interrupted_status(event_service, tmp_path): '{"score": 1.0, "complete": true, "missing": ""}', usage_id="judge" ) try: - await event_service.resume_goal(judge_llm=judge) - await asyncio.wait_for(event_service._goal_task, timeout=15) + await event_service.resume_goal_loop(judge_llm=judge) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) - outcome = event_service._goal_outcome + outcome = event_service._goal_loop_outcome assert outcome is not None assert outcome.status == "complete" assert outcome.iterations == 2 # resumed from round 1 -> completed at round 2 @@ -322,19 +328,19 @@ async def test_resume_from_interrupted_status(event_service, tmp_path): @pytest.mark.asyncio -async def test_resume_without_resumable_goal_raises(event_service, tmp_path): +async def test_resume_without_resumable_goal_loop_raises(event_service, tmp_path): await _start(event_service, tmp_path, "noop") judge = _scripted("{}", usage_id="judge") try: with pytest.raises(ValueError, match="no_resumable_goal"): - await event_service.resume_goal(judge_llm=judge) + await event_service.resume_goal_loop(judge_llm=judge) finally: await event_service.close() @pytest.mark.asyncio -async def test_resume_after_completed_goal_raises(event_service, tmp_path): - # A completed (or capped) goal is not resumable. +async def test_resume_after_completed_goal_loop_raises(event_service, tmp_path): + # A completed (or capped) goal loop is not resumable. await _start(event_service, tmp_path, "noop") conversation = event_service.get_conversation() with conversation._state: @@ -354,14 +360,14 @@ async def test_resume_after_completed_goal_raises(event_service, tmp_path): judge = _scripted("{}", usage_id="judge") try: with pytest.raises(ValueError, match="no_resumable_goal"): - await event_service.resume_goal(judge_llm=judge) + await event_service.resume_goal_loop(judge_llm=judge) finally: await event_service.close() @pytest.mark.asyncio -async def test_goal_halts_on_run_error_as_interrupted(event_service, tmp_path): - # Simulate "out of credits": the agent's run raises. The goal must record an +async def test_goal_loop_halts_on_run_error_as_interrupted(event_service, tmp_path): + # Simulate "out of credits": the agent's run raises. The goal loop must record an # interrupted (resumable) status, not die silently with no outcome. (tmp_path / "workspace").mkdir(exist_ok=True) await event_service.start() @@ -372,19 +378,21 @@ async def test_goal_halts_on_run_error_as_interrupted(event_service, tmp_path): '{"score": 0.1, "complete": false, "missing": "x"}', usage_id="judge" ) try: - await event_service.start_goal("build x", judge_llm=judge, max_iterations=5) - await asyncio.wait_for(event_service._goal_task, timeout=15) + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=5 + ) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) updates = _goal_status_updates(event_service) assert updates[-1]["status"] == "interrupted" assert updates[-1]["active"] is False - assert event_service._goal_outcome is None + assert event_service._goal_loop_outcome is None finally: await event_service.close() @pytest.mark.asyncio -async def test_goal_emits_interrupted_on_unexpected_error(event_service, tmp_path): +async def test_goal_loop_emits_interrupted_on_unexpected_error(event_service, tmp_path): # A judge LLM that *raises* (e.g. a network error) crashes the loop via the # generic `except Exception` path -- distinct from a run error surfaced as # ConversationExecutionStatus.ERROR (test above). The loop must still record @@ -395,30 +403,32 @@ async def test_goal_emits_interrupted_on_unexpected_error(event_service, tmp_pat [RuntimeError("judge network error")], usage_id="judge" ) try: - await event_service.start_goal("build x", judge_llm=judge, max_iterations=5) - await asyncio.wait_for(event_service._goal_task, timeout=15) + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=5 + ) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) updates = _goal_status_updates(event_service) assert updates[-1]["status"] == "interrupted" assert updates[-1]["active"] is False - assert event_service._goal_outcome is None + assert event_service._goal_loop_outcome is None finally: await event_service.close() @pytest.mark.asyncio -async def test_start_goal_rejected_while_run_active(event_service, tmp_path): +async def test_start_goal_loop_rejected_while_run_active(event_service, tmp_path): # /goal must refuse with conversation_already_running (-> 409) when a normal # run is already in flight, instead of slipping in beside it and judging that # run's unrelated transcript. A placeholder _run_task stands in for the active - # run (same pattern as test_start_goal_rejects_concurrent_goal). + # run (same pattern as test_start_goal_loop_rejects_concurrent_goal_loop). await _start(event_service, tmp_path, "noop") judge = _scripted("{}", usage_id="judge") try: event_service._run_task = asyncio.create_task(asyncio.sleep(10)) with pytest.raises(ValueError, match="conversation_already_running"): - await event_service.start_goal("build x", judge_llm=judge) - assert event_service._goal_task is None + await event_service.start_goal_loop("build x", judge_llm=judge) + assert event_service._goal_loop_task is None finally: event_service._run_task.cancel() event_service._run_task = None @@ -426,8 +436,8 @@ async def test_start_goal_rejected_while_run_active(event_service, tmp_path): @pytest.mark.asyncio -async def test_resume_goal_rejected_while_run_active(event_service, tmp_path): - # resume_goal shares start_goal's guard; same placeholder _run_task approach. +async def test_resume_goal_loop_rejected_while_run_active(event_service, tmp_path): + # Resume uses the same busy guard; reuse the placeholder _run_task approach. await _start(event_service, tmp_path, "noop") conversation = event_service.get_conversation() with conversation._state: @@ -448,7 +458,7 @@ async def test_resume_goal_rejected_while_run_active(event_service, tmp_path): try: event_service._run_task = asyncio.create_task(asyncio.sleep(10)) with pytest.raises(ValueError, match="conversation_already_running"): - await event_service.resume_goal(judge_llm=judge) + await event_service.resume_goal_loop(judge_llm=judge) finally: event_service._run_task.cancel() event_service._run_task = None