diff --git a/openhands-agent-server/openhands/agent_server/conversation_router.py b/openhands-agent-server/openhands/agent_server/conversation_router.py index ed1caf86e1..4009c930c1 100644 --- a/openhands-agent-server/openhands/agent_server/conversation_router.py +++ b/openhands-agent-server/openhands/agent_server/conversation_router.py @@ -34,6 +34,7 @@ SetConfirmationPolicyRequest, SetSecurityAnalyzerRequest, StartConversationRequest, + StartGoalRequest, Success, UpdateConversationRequest, UpdateSecretsRequest, @@ -294,6 +295,94 @@ async def run_conversation( return Success() +@conversation_router.post( + "/{conversation_id}/goal", + responses={ + 404: {"description": "Item not found"}, + 409: {"description": "Conversation run or goal loop is already running"}, + }, +) +async def start_goal_in_conversation( + conversation_id: UUID, + request: StartGoalRequest, + conversation_service: ConversationService = Depends(get_conversation_service), +) -> Success: + """Start a ``/goal`` loop inside an existing conversation. + + The loop appends messages and starts agent runs in the same conversation + history and event stream as the main chat. It does not create a separate + conversation for the goal or fork the existing one. + """ + event_service = await conversation_service.get_event_service(conversation_id) + if event_service is None: + raise HTTPException(status.HTTP_404_NOT_FOUND) + + try: + await event_service.start_goal_loop( + request.objective, max_iterations=request.max_iterations + ) + except ValueError as e: + message = str(e) + if message in ("conversation_already_running", "goal_already_running"): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Conversation run or goal loop already running.", + ) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + + return Success() + + +@conversation_router.post( + "/{conversation_id}/goal/stop", + responses={404: {"description": "Item not found"}}, +) +async def stop_goal_in_conversation( + conversation_id: UUID, + conversation_service: ConversationService = Depends(get_conversation_service), +) -> Success: + """Stop the active ``/goal`` loop inside this conversation. + + This cancels only the background goal loop, not the conversation itself, and + records an ``interrupted`` goal status so ``/goal/resume`` can continue it. + """ + event_service = await conversation_service.get_event_service(conversation_id) + if event_service is None: + raise HTTPException(status.HTTP_404_NOT_FOUND) + await event_service.stop_goal_loop() + return Success() + + +@conversation_router.post( + "/{conversation_id}/goal/resume", + responses={ + 404: {"description": "Item not found"}, + 409: {"description": "Conversation run or goal loop is already running"}, + }, +) +async def resume_goal_in_conversation( + conversation_id: UUID, + conversation_service: ConversationService = Depends(get_conversation_service), +) -> Success: + """Resume the last interrupted ``/goal`` loop inside this conversation.""" + event_service = await conversation_service.get_event_service(conversation_id) + if event_service is None: + raise HTTPException(status.HTTP_404_NOT_FOUND) + + try: + await event_service.resume_goal_loop() + except ValueError as e: + message = str(e) + if message in ("conversation_already_running", "goal_already_running"): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Conversation run or goal loop already running.", + ) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) + + return Success() + + @conversation_router.post( "/{conversation_id}/secrets", responses={404: {"description": "Item not found"}} ) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index e8fa2fa17c..c99c663892 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -17,9 +17,19 @@ StoredConversation, ) from openhands.agent_server.pub_sub import PubSub, Subscriber -from openhands.sdk import LLM, AgentBase, Event, Message, get_logger +from openhands.sdk import LLM, AgentBase, Event, Message, TextContent, get_logger from openhands.sdk.agent import ACPAgent from openhands.sdk.conversation.base import BaseConversation +from openhands.sdk.conversation.goal import ( + GoalController, + GoalDone, + GoalOutcome, + GoalStatus, + GoalStatusName, + GoalStep, + GoalVerdict, +) +from openhands.sdk.conversation.goal.prompts import RESUME_PROMPT from openhands.sdk.conversation.impl.local_conversation import ( ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID, ACP_SUPERSEDE_INFLIGHT_PROMPT, @@ -92,6 +102,9 @@ class EventService: _lease_task: asyncio.Task | None = field(default=None, init=False) _external_lease_renewal: bool = field(default=False, init=False) _run_executor: ThreadPoolExecutor | None = field(default=None, init=False) + # Background task for a /goal loop that is running inside this conversation. + _goal_loop_task: asyncio.Task | None = field(default=None, init=False) + _goal_loop_outcome: GoalOutcome | None = field(default=None, init=False) @property def conversation_dir(self): @@ -447,9 +460,15 @@ async def batch_get_events(self, event_ids: list[str]) -> list[Event | None]: ) return results - async def send_message(self, message: Message, run: bool = False): + async def send_message( + self, message: Message, run: bool = False, _from_goal_loop: bool = False + ): if not self._conversation: raise ValueError("inactive_service") + # A normal user message supersedes any active /goal loop in this + # conversation. The goal loop's own messages pass _from_goal_loop=True. + if not _from_goal_loop: + await self.stop_goal_loop() explicit_interrupt_generation = self._explicit_interrupt_generation loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._conversation.send_message, message) @@ -989,6 +1008,248 @@ async def _run_and_publish(): # Create task but don't await it - runs in background self._run_task = asyncio.create_task(_run_and_publish()) + async def start_goal_loop( + self, + objective: str, + *, + judge_llm: LLM | None = None, + max_iterations: int = 10, + ) -> None: + """Start a ``/goal`` loop inside this conversation. + + Sends the objective, runs the agent, and judges completion after each + run, re-prompting until the goal is done or ``max_iterations`` is + reached. All work stays in this conversation's event history and stream, + exactly like a normal run; this does not create another conversation. + + Args: + objective: The goal to pursue and audit against. + judge_llm: LLM that grades completion. Defaults to the agent's LLM. + max_iterations: Hard cap on audit rounds before giving up. + + Raises: + ValueError: If the service is inactive, a goal loop is already + running, no judge LLM is available, or the objective is empty. + """ + if not self._conversation or self._closing: + raise ValueError("inactive_service") + if judge_llm is None: + judge_llm = getattr(self._conversation.agent, "llm", None) + if judge_llm is None: + raise ValueError("no_judge_llm") + # GoalController validates the objective/max_iterations (raises ValueError). + controller = GoalController(objective, judge_llm, max_iterations=max_iterations) + # Under _run_lock, atomically refuse a concurrent goal loop or active + # conversation run; otherwise /goal could judge an unrelated transcript. + async with self._run_lock: + if self._closing: + raise ValueError("inactive_service") + if self._goal_loop_task is not None and not self._goal_loop_task.done(): + raise ValueError("goal_already_running") + # _run_task first: a live run holds the state lock across its step, + # so reading execution status would block behind it. + if (self._run_task is not None and not self._run_task.done()) or ( + await self._get_execution_status() + == ConversationExecutionStatus.RUNNING + ): + raise ValueError("conversation_already_running") + # Re-check after the await above: close() runs without _run_lock, so + # it may have begun teardown meanwhile (mirrors run()'s post-status + # _closing re-check) -- avoid spawning a task close() won't cancel. + if self._closing: + raise ValueError("inactive_service") + self._goal_loop_outcome = None + self._goal_loop_task = asyncio.create_task(self._run_goal_loop(controller)) + + async def _run_goal_loop( + self, controller: GoalController, *, resume: bool = False + ) -> None: + """Drive one active ``/goal`` loop inside this conversation. + + Reuses the SDK's transport-agnostic ``GoalController`` for decisions; + this method owns only I/O: sending messages, awaiting each run, judging + off the event loop, and publishing goal-status updates. + """ + conversation = self._conversation + if conversation is None: + return + loop = asyncio.get_running_loop() + + def _snapshot_and_judge() -> GoalStep: + # Snapshot events under the conversation lock, then judge (an LLM + # call) with the lock released -- both on this worker thread. + with conversation._state: + events = list(conversation._state.events) + return controller.on_run_finished(events) + + def _user(text: str) -> Message: + return Message(role="user", content=[TextContent(text=text)]) + + async def _emit_status( + *, + active: bool, + status: GoalStatusName, + verdict: GoalVerdict | None = None, + ) -> None: + # Persist + publish a goal-status update so a UI can render a chip. + # ConversationStateUpdateEvent is not LLM-convertible, so it never + # enters the agent's or the judge's context. + event = ConversationStateUpdateEvent( + key="goal", + value=GoalStatus( + active=active, + status=status, + iteration=controller.iteration, + max_iterations=controller.max_iterations, + objective=controller.objective, + verdict=verdict, + ).model_dump(), + ) + + def _persist() -> None: + with conversation._state: + conversation._on_event(event) + + await loop.run_in_executor(None, _persist) + + try: + await _emit_status(active=True, status="running") + nudge = RESUME_PROMPT if resume else controller.start() + await self.send_message(_user(nudge), run=False, _from_goal_loop=True) + while True: + try: + await self.run() + except ValueError as e: + if str(e) != "conversation_already_running": + raise + run_task = self._run_task + if run_task is not None: + await asyncio.wait({run_task}) + status = await self._get_execution_status() + if status in ( + ConversationExecutionStatus.PAUSED, + ConversationExecutionStatus.ERROR, + ConversationExecutionStatus.STUCK, + ): + logger.info("Goal loop halted early: status=%s", status) + await _emit_status(active=False, status="interrupted") + return + step = await loop.run_in_executor(None, _snapshot_and_judge) + if isinstance(step, GoalDone): + self._goal_loop_outcome = step.outcome + await _emit_status( + active=False, + status=step.outcome.status, + verdict=step.outcome.verdict, + ) + logger.info( + "Goal %s after %d round(s)", + step.outcome.status, + step.outcome.iterations, + ) + return + # Carry the round's verdict so a UI can show per-round judge + # feedback (score + what's missing), not just the final one. + await _emit_status(active=True, status="running", verdict=step.verdict) + await self.send_message( + _user(step.followup), run=False, _from_goal_loop=True + ) + except asyncio.CancelledError: + logger.info("Goal loop cancelled") + # Explicit stop or user interjection: record a resumable + # interrupted status, except during service teardown. + if not self._closing: + with suppress(Exception): + await _emit_status(active=False, status="interrupted") + raise + except Exception: + logger.exception("Goal loop failed") + # An unexpected failure (judge LLM error, controller bug, ...) leaves + # the loop dead: record an interrupted status (resumable) so the UI + # doesn't show it running. Skip during close(), like the cancel path. + if not self._closing: + with suppress(Exception): + await _emit_status(active=False, status="interrupted") + finally: + self._goal_loop_task = None + + async def stop_goal_loop(self) -> bool: + """Cancel the active ``/goal`` loop inside this conversation. + + Returns True if a loop was active. Unlike ``interrupt()``, this targets + the background goal loop itself and records an ``interrupted`` status so + :meth:`resume_goal_loop` can continue it later. + """ + task = self._goal_loop_task + if task is None or task.done(): + return False + task.cancel() + with suppress(asyncio.CancelledError): + await task + return True + + def _last_goal_loop_status(self) -> dict | None: + """Return the most recent goal-status payload, or None if there is none.""" + conversation = self._conversation + if conversation is None: + return None + with conversation._state: + for event in reversed(list(conversation._state.events)): + if ( + isinstance(event, ConversationStateUpdateEvent) + and event.key == "goal" + ): + return event.value if isinstance(event.value, dict) else None + return None + + async def resume_goal_loop( + self, *, judge_llm: LLM | None = None, max_iterations: int | None = None + ) -> None: + """Resume the last interrupted ``/goal`` loop in this conversation. + + Reconstructs the loop from the last persisted goal-status event and + continues from the iteration it had reached. This works within a session + and across a server restart because goal-status events are persisted. + + Raises: + ValueError: If the service is inactive, a goal loop is already + running, no judge LLM is available, or there is no resumable goal + loop because none was started or it already completed/capped. + """ + if not self._conversation or self._closing: + raise ValueError("inactive_service") + loop = asyncio.get_running_loop() + last = await loop.run_in_executor(None, self._last_goal_loop_status) + if last is None or last.get("status") in ("complete", "capped"): + raise ValueError("no_resumable_goal") + if judge_llm is None: + judge_llm = getattr(self._conversation.agent, "llm", None) + if judge_llm is None: + raise ValueError("no_judge_llm") + controller = GoalController( + last["objective"], + judge_llm, + max_iterations=max_iterations or int(last["max_iterations"]), + ) + controller.iteration = int(last["iteration"]) + # Same busy guard as start_goal_loop: refuse a goal loop or active run. + async with self._run_lock: + if self._closing: + raise ValueError("inactive_service") + if self._goal_loop_task is not None and not self._goal_loop_task.done(): + raise ValueError("goal_already_running") + if (self._run_task is not None and not self._run_task.done()) or ( + await self._get_execution_status() + == ConversationExecutionStatus.RUNNING + ): + raise ValueError("conversation_already_running") + if self._closing: # see start_goal_loop: close() may have begun teardown + raise ValueError("inactive_service") + self._goal_loop_outcome = None + self._goal_loop_task = asyncio.create_task( + self._run_goal_loop(controller, resume=True) + ) + async def respond_to_confirmation(self, request: ConfirmationResponseRequest): if request.accept: try: @@ -1109,6 +1370,15 @@ async def close(self): self._explicit_interrupt_generation += 1 self._rerun_requested = False self._acp_internal_rerun_requested = False + + # Cancel any in-progress /goal loop first so it cannot start a new run + # while we drain the current one below. + if self._goal_loop_task is not None and not self._goal_loop_task.done(): + self._goal_loop_task.cancel() + with suppress(asyncio.CancelledError): + await self._goal_loop_task + self._goal_loop_task = None + if self._lease_task is not None: self._lease_task.cancel() with suppress(asyncio.CancelledError): diff --git a/openhands-agent-server/openhands/agent_server/models.py b/openhands-agent-server/openhands/agent_server/models.py index c94cd89ca1..f59b875b9a 100644 --- a/openhands-agent-server/openhands/agent_server/models.py +++ b/openhands-agent-server/openhands/agent_server/models.py @@ -499,6 +499,15 @@ class AskAgentResponse(BaseModel): response: str = Field(description="The agent's response to the question") +class StartGoalRequest(BaseModel): + """Payload to start a ``/goal`` loop inside a conversation.""" + + objective: str = Field(description="The goal objective to pursue and audit.") + max_iterations: int = Field( + default=10, ge=1, description="Maximum audit rounds before giving up." + ) + + class AgentResponseResult(BaseModel): """The agent's final response for a conversation. diff --git a/openhands-sdk/openhands/sdk/conversation/goal/controller.py b/openhands-sdk/openhands/sdk/conversation/goal/controller.py index 3ea92524bf..6bc367dc7d 100644 --- a/openhands-sdk/openhands/sdk/conversation/goal/controller.py +++ b/openhands-sdk/openhands/sdk/conversation/goal/controller.py @@ -60,6 +60,9 @@ class GoalContinue(BaseModel): """Decision to keep going: send ``followup`` before the next run.""" followup: str + verdict: GoalVerdict = Field( + description="The judge's verdict for the round that just finished." + ) class GoalDone(BaseModel): @@ -127,4 +130,4 @@ def on_run_finished(self, events: Sequence[Event]) -> GoalStep: ) missing = verdict.missing or "Some requirements are not yet verified." followup = FOLLOWUP_PROMPT.format(iteration=self.iteration, missing=missing) - return GoalContinue(followup=followup) + return GoalContinue(followup=followup, verdict=verdict) diff --git a/tests/agent_server/test_conversation_router.py b/tests/agent_server/test_conversation_router.py index 51a570487c..924b6a82ce 100644 --- a/tests/agent_server/test_conversation_router.py +++ b/tests/agent_server/test_conversation_router.py @@ -1093,6 +1093,162 @@ def test_run_conversation_not_found( client.app.dependency_overrides.clear() +def test_start_goal_in_conversation_success( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """/goal endpoint forwards the objective to the event service.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.start_goal_loop.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal", + json={"objective": "build x"}, + ) + assert response.status_code == 200 + assert response.json()["success"] is True + mock_event_service.start_goal_loop.assert_awaited_once_with( + "build x", max_iterations=10 + ) + finally: + client.app.dependency_overrides.clear() + + +def test_start_goal_in_conversation_not_found( + client, mock_conversation_service, sample_conversation_id +): + """/goal returns 404 when the conversation is unknown.""" + mock_conversation_service.get_event_service.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal", + json={"objective": "build x"}, + ) + assert response.status_code == 404 + finally: + client.app.dependency_overrides.clear() + + +def test_start_goal_in_conversation_rejects_busy_loop( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """/goal returns 409 when a goal loop or conversation run is active.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.start_goal_loop.side_effect = ValueError("goal_already_running") + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal", + json={"objective": "build x"}, + ) + assert response.status_code == 409 + finally: + client.app.dependency_overrides.clear() + + +def test_stop_goal_in_conversation_success( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """/goal/stop endpoint forwards to the event service.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.stop_goal_loop.return_value = True + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post(f"/api/conversations/{sample_conversation_id}/goal/stop") + assert response.status_code == 200 + assert response.json()["success"] is True + mock_event_service.stop_goal_loop.assert_awaited_once() + finally: + client.app.dependency_overrides.clear() + + +def test_stop_goal_in_conversation_not_found( + client, mock_conversation_service, sample_conversation_id +): + """/goal/stop returns 404 when the conversation is unknown.""" + mock_conversation_service.get_event_service.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post(f"/api/conversations/{sample_conversation_id}/goal/stop") + assert response.status_code == 404 + finally: + client.app.dependency_overrides.clear() + + +def test_resume_goal_in_conversation_success( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """/goal/resume endpoint forwards to the event service.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.resume_goal_loop.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal/resume" + ) + assert response.status_code == 200 + assert response.json()["success"] is True + mock_event_service.resume_goal_loop.assert_awaited_once() + finally: + client.app.dependency_overrides.clear() + + +def test_resume_goal_in_conversation_not_found( + client, mock_conversation_service, sample_conversation_id +): + """/goal/resume returns 404 when the conversation is unknown.""" + mock_conversation_service.get_event_service.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal/resume" + ) + assert response.status_code == 404 + finally: + client.app.dependency_overrides.clear() + + +def test_resume_goal_in_conversation_no_resumable( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """/goal/resume returns 400 when there is nothing to resume.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.resume_goal_loop.side_effect = ValueError("no_resumable_goal") + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/goal/resume" + ) + assert response.status_code == 400 + finally: + client.app.dependency_overrides.clear() + + def test_switch_acp_model_success( client, mock_conversation_service, mock_event_service, sample_conversation_id ): diff --git a/tests/agent_server/test_goal_loop.py b/tests/agent_server/test_goal_loop.py new file mode 100644 index 0000000000..ecc7dd00b7 --- /dev/null +++ b/tests/agent_server/test_goal_loop.py @@ -0,0 +1,465 @@ +"""Tests for the /goal driver loop in EventService (the agent-server side). + +These drive a real EventService + LocalConversation with a scripted TestLLM +agent (each run finishes on a content-only reply) and a separate scripted judge. +""" + +import asyncio +from threading import Event +from typing import cast +from unittest.mock import MagicMock, patch +from uuid import uuid4 + +import pytest +from pydantic import PrivateAttr, SecretStr + +from openhands.agent_server.event_service import EventService +from openhands.agent_server.models import StoredConversation +from openhands.sdk.agent import Agent +from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent +from openhands.sdk.llm import LLM, Message, TextContent +from openhands.sdk.testing import TestLLM +from openhands.sdk.workspace import LocalWorkspace + + +def _scripted(*texts: str, usage_id: str) -> TestLLM: + return TestLLM.from_messages( + [Message(role="assistant", content=[TextContent(text=t)]) for t in texts], + usage_id=usage_id, + ) + + +class _GatedLLM(TestLLM): + """Judge LLM that blocks until released, signalling when it has entered. + + Lets a test deterministically catch the goal loop mid-audit (no run in + flight) and stop it. + """ + + _gate: Event = PrivateAttr(default_factory=Event) + _entered: Event = PrivateAttr(default_factory=Event) + + def completion( + self, + messages, + tools=None, + _return_metrics=False, + add_security_risk_prediction=False, + on_token=None, + **kwargs, + ): + self._entered.set() + self._gate.wait(timeout=10) + return super().completion( + messages, + tools, + _return_metrics, + add_security_risk_prediction, + on_token, + **kwargs, + ) + + +def _goal_status_updates(event_service: EventService) -> list: + return [ + e.value + for e in event_service.get_conversation()._state.events + if isinstance(e, ConversationStateUpdateEvent) and e.key == "goal" + ] + + +@pytest.fixture +def event_service(tmp_path): + with patch("openhands.sdk.llm.utils.model_info.httpx.get") as mock_get: + mock_get.return_value = MagicMock(json=lambda: {"data": []}) + service = EventService( + stored=StoredConversation( + id=uuid4(), + agent=Agent( + llm=LLM( + usage_id="agent", model="test-model", api_key=SecretStr("x") + ), + tools=[], + ), + workspace=LocalWorkspace(working_dir=str(tmp_path / "workspace")), + ), + conversations_dir=tmp_path / "conversations", + ) + yield service + + +async def _start(service: EventService, tmp_path, *agent_turns: str) -> None: + """Start the service and install a scripted agent LLM (one reply per run).""" + (tmp_path / "workspace").mkdir(exist_ok=True) + await service.start() + service.get_conversation().switch_llm(_scripted(*agent_turns, usage_id="agent")) + + +_DONE = '{"score": 1.0, "complete": true, "missing": ""}' +_NOT_DONE = '{"score": 0.2, "complete": false, "missing": "tests"}' + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("agent_turns", "verdicts", "max_iterations", "status", "iterations"), + [ + (("turn 1", "turn 2"), (_NOT_DONE, _DONE), 5, "complete", 2), + (("turn 1", "turn 2"), (_NOT_DONE, _NOT_DONE), 2, "capped", 2), + ], + ids=["completes-after-two-rounds", "caps-at-max"], +) +async def test_goal_loop_outcomes( + event_service, tmp_path, agent_turns, verdicts, max_iterations, status, iterations +): + await _start(event_service, tmp_path, *agent_turns) + judge = _scripted(*verdicts, usage_id="judge") + try: + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=max_iterations + ) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) + + outcome = event_service._goal_loop_outcome + assert outcome is not None + assert outcome.status == status + assert outcome.iterations == iterations + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_goal_loop_emits_status_events(event_service, tmp_path): + # The loop publishes ConversationStateUpdateEvent(key="goal") at each + # lifecycle point; they are persisted to the shared event log (and streamed + # to subscribers) so a UI can render a progress chip. + await _start(event_service, tmp_path, "did the work") + judge = _scripted( + '{"score": 1.0, "complete": true, "missing": ""}', usage_id="judge" + ) + try: + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=3 + ) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) + + updates = [ + e.value + for e in event_service.get_conversation()._state.events + if isinstance(e, ConversationStateUpdateEvent) and e.key == "goal" + ] + assert updates, "expected goal-status events" + # First update: running + active. Last: complete + inactive. + assert updates[0]["active"] is True + assert updates[0]["status"] == "running" + assert updates[-1]["active"] is False + assert updates[-1]["status"] == "complete" + assert updates[-1]["iteration"] == 1 + assert updates[-1]["objective"] == "build x" + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_goal_loop_emits_per_round_verdicts(event_service, tmp_path): + # Each continuing round publishes its judge verdict (score + missing) on the + # running status event, so a UI can show per-round feedback, not just the + # terminal verdict. + await _start(event_service, tmp_path, "turn 1", "turn 2") + judge = _scripted(_NOT_DONE, _DONE, usage_id="judge") + try: + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=5 + ) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) + + updates = _goal_status_updates(event_service) + # The kickoff update (iteration 0) has no verdict yet. + kickoff = next(u for u in updates if u["iteration"] == 0) + assert kickoff["verdict"] is None + # The mid-loop "running" update for round 1 carries the round's verdict. + round_one = next( + u for u in updates if u["status"] == "running" and u["iteration"] == 1 + ) + assert round_one["verdict"] is not None + assert round_one["verdict"]["score"] == 0.2 + assert round_one["verdict"]["missing"] == "tests" + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_goal_loop_defaults_judge_to_agent_llm(event_service, tmp_path): + # No judge_llm passed -> the agent's own LLM is used as the judge, so its + # scripted queue serves both the agent turn and the verdict. This is the + # path the POST /goal endpoint always takes. + await _start( + event_service, + tmp_path, + "did the work", + '{"score": 1.0, "complete": true, "missing": ""}', + ) + try: + await event_service.start_goal_loop("build x", max_iterations=3) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) + + outcome = event_service._goal_loop_outcome + assert outcome is not None + assert outcome.status == "complete" + assert outcome.iterations == 1 + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_start_goal_loop_rejects_empty_objective(event_service, tmp_path): + await _start(event_service, tmp_path, "noop") + judge = _scripted("{}", usage_id="judge") + try: + with pytest.raises(ValueError): + await event_service.start_goal_loop(" ", judge_llm=judge) + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_start_goal_loop_rejects_concurrent_goal_loop(event_service, tmp_path): + await _start(event_service, tmp_path, "noop") + judge = _scripted("{}", usage_id="judge") + try: + # Occupy the goal loop slot with a task that won't finish on its own. + event_service._goal_loop_task = asyncio.create_task(asyncio.sleep(10)) + with pytest.raises(ValueError, match="goal_already_running"): + await event_service.start_goal_loop("build x", judge_llm=judge) + finally: + event_service._goal_loop_task.cancel() + event_service._goal_loop_task = None + await event_service.close() + + +@pytest.mark.asyncio +async def test_stop_goal_loop_when_idle_returns_false(event_service, tmp_path): + await _start(event_service, tmp_path, "noop") + try: + assert await event_service.stop_goal_loop() is False + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_user_message_stops_running_goal_loop(event_service, tmp_path): + # A user message on the normal chat path cancels a running goal loop before + # being processed. + await _start(event_service, tmp_path, "noop") + try: + event_service._goal_loop_task = asyncio.create_task(asyncio.sleep(30)) + await event_service.send_message( + Message(role="user", content=[TextContent(text="hello")]), run=False + ) + assert event_service._goal_loop_task.done() + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_stop_running_goal_loop_emits_interrupted(event_service, tmp_path): + await _start(event_service, tmp_path, "did the work") + judge = cast( + _GatedLLM, + _GatedLLM.from_messages( + [ + Message( + role="assistant", + content=[TextContent(text='{"score": 0.2, "complete": false}')], + ) + ], + usage_id="judge", + ), + ) + try: + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=5 + ) + # Wait until the judge is blocked: the goal is mid-audit, no run in flight. + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, judge._entered.wait, 5.0) + + assert await event_service.stop_goal_loop() is True + updates = _goal_status_updates(event_service) + assert updates[-1]["status"] == "interrupted" + assert updates[-1]["active"] is False + assert updates[-1]["iteration"] == 1 + finally: + judge._gate.set() # release the orphaned judge thread for cleanup + await event_service.close() + + +@pytest.mark.asyncio +async def test_resume_from_interrupted_status(event_service, tmp_path): + await _start(event_service, tmp_path, "resumed and finished") + # Simulate a goal loop that was interrupted at round 1 of 5. + conversation = event_service.get_conversation() + with conversation._state: + conversation._on_event( + ConversationStateUpdateEvent( + key="goal", + value={ + "active": False, + "status": "interrupted", + "iteration": 1, + "max_iterations": 5, + "objective": "build x", + "verdict": None, + }, + ) + ) + judge = _scripted( + '{"score": 1.0, "complete": true, "missing": ""}', usage_id="judge" + ) + try: + await event_service.resume_goal_loop(judge_llm=judge) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) + + outcome = event_service._goal_loop_outcome + assert outcome is not None + assert outcome.status == "complete" + assert outcome.iterations == 2 # resumed from round 1 -> completed at round 2 + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_resume_without_resumable_goal_loop_raises(event_service, tmp_path): + await _start(event_service, tmp_path, "noop") + judge = _scripted("{}", usage_id="judge") + try: + with pytest.raises(ValueError, match="no_resumable_goal"): + await event_service.resume_goal_loop(judge_llm=judge) + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_resume_after_completed_goal_loop_raises(event_service, tmp_path): + # A completed (or capped) goal loop is not resumable. + await _start(event_service, tmp_path, "noop") + conversation = event_service.get_conversation() + with conversation._state: + conversation._on_event( + ConversationStateUpdateEvent( + key="goal", + value={ + "active": False, + "status": "complete", + "iteration": 2, + "max_iterations": 5, + "objective": "build x", + "verdict": None, + }, + ) + ) + judge = _scripted("{}", usage_id="judge") + try: + with pytest.raises(ValueError, match="no_resumable_goal"): + await event_service.resume_goal_loop(judge_llm=judge) + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_goal_loop_halts_on_run_error_as_interrupted(event_service, tmp_path): + # Simulate "out of credits": the agent's run raises. The goal loop must record an + # interrupted (resumable) status, not die silently with no outcome. + (tmp_path / "workspace").mkdir(exist_ok=True) + await event_service.start() + event_service.get_conversation().switch_llm( + TestLLM.from_messages([RuntimeError("out of credits")], usage_id="agent") + ) + judge = _scripted( + '{"score": 0.1, "complete": false, "missing": "x"}', usage_id="judge" + ) + try: + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=5 + ) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) + + updates = _goal_status_updates(event_service) + assert updates[-1]["status"] == "interrupted" + assert updates[-1]["active"] is False + assert event_service._goal_loop_outcome is None + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_goal_loop_emits_interrupted_on_unexpected_error(event_service, tmp_path): + # A judge LLM that *raises* (e.g. a network error) crashes the loop via the + # generic `except Exception` path -- distinct from a run error surfaced as + # ConversationExecutionStatus.ERROR (test above). The loop must still record + # a terminal interrupted (resumable) status; otherwise the last persisted + # event stays active=True/running and the UI shows a dead goal as running. + await _start(event_service, tmp_path, "did the work") + judge = TestLLM.from_messages( + [RuntimeError("judge network error")], usage_id="judge" + ) + try: + await event_service.start_goal_loop( + "build x", judge_llm=judge, max_iterations=5 + ) + await asyncio.wait_for(event_service._goal_loop_task, timeout=15) + + updates = _goal_status_updates(event_service) + assert updates[-1]["status"] == "interrupted" + assert updates[-1]["active"] is False + assert event_service._goal_loop_outcome is None + finally: + await event_service.close() + + +@pytest.mark.asyncio +async def test_start_goal_loop_rejected_while_run_active(event_service, tmp_path): + # /goal must refuse with conversation_already_running (-> 409) when a normal + # run is already in flight, instead of slipping in beside it and judging that + # run's unrelated transcript. A placeholder _run_task stands in for the active + # run (same pattern as test_start_goal_loop_rejects_concurrent_goal_loop). + await _start(event_service, tmp_path, "noop") + judge = _scripted("{}", usage_id="judge") + try: + event_service._run_task = asyncio.create_task(asyncio.sleep(10)) + with pytest.raises(ValueError, match="conversation_already_running"): + await event_service.start_goal_loop("build x", judge_llm=judge) + assert event_service._goal_loop_task is None + finally: + event_service._run_task.cancel() + event_service._run_task = None + await event_service.close() + + +@pytest.mark.asyncio +async def test_resume_goal_loop_rejected_while_run_active(event_service, tmp_path): + # Resume uses the same busy guard; reuse the placeholder _run_task approach. + await _start(event_service, tmp_path, "noop") + conversation = event_service.get_conversation() + with conversation._state: + conversation._on_event( + ConversationStateUpdateEvent( + key="goal", + value={ + "active": False, + "status": "interrupted", + "iteration": 1, + "max_iterations": 5, + "objective": "build x", + "verdict": None, + }, + ) + ) + judge = _scripted("{}", usage_id="judge") + try: + event_service._run_task = asyncio.create_task(asyncio.sleep(10)) + with pytest.raises(ValueError, match="conversation_already_running"): + await event_service.resume_goal_loop(judge_llm=judge) + finally: + event_service._run_task.cancel() + event_service._run_task = None + await event_service.close() diff --git a/tests/sdk/conversation/goal/test_controller.py b/tests/sdk/conversation/goal/test_controller.py index 49b60cd977..c99f4c9d70 100644 --- a/tests/sdk/conversation/goal/test_controller.py +++ b/tests/sdk/conversation/goal/test_controller.py @@ -34,6 +34,11 @@ def test_continue_when_incomplete(): assert isinstance(step, GoalContinue) assert "tests" in step.followup assert controller.iteration == 1 + # The verdict for the round just finished rides along so a driver can + # publish per-round judge feedback, not just the terminal one. + assert step.verdict.score == 0.2 + assert step.verdict.complete is False + assert step.verdict.missing == "tests" def test_done_when_complete():