Skip to content

Commit ff8e0bf

Browse files
Jeomonclaude
andcommitted
Refactor self-improvement restart: clearer naming and deferred task resumption
- Rename restart.json keys: task → resume_task, original_task → deferred_task for immediate readability at a glance - Merge startup_ok.json into restart.json: on_ready now deletes restart.json (absence = startup succeeded) instead of writing a separate probe file - Preserve deferred_task across self-correction retry cycles in control_center so the original continuation survives multiple fix attempts - Resume deferred_task on clean recovery success (run_id set, no startup_error) rather than dispatching the agent's fix-cycle continue_with Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 8bce7cd commit ff8e0bf

2 files changed

Lines changed: 35 additions & 31 deletions

File tree

operator_use/agent/tools/builtin/control_center.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ async def control_center(
250250
# belongs to the same failure run and appends to the same log group.
251251
run_id = kwargs.get("_run_id")
252252
restart_data: dict = {
253-
"task": continue_with,
253+
"resume_task": continue_with,
254254
"channel": channel,
255255
"chat_id": chat_id,
256256
"account_id": account_id,
@@ -259,6 +259,16 @@ async def control_center(
259259
restart_data["improvement_session"] = improvement_session
260260
if run_id:
261261
restart_data["run_id"] = run_id
262+
# Preserve deferred_task across self-correction retries so the
263+
# clean worker can resume it once the fix succeeds.
264+
try:
265+
if RESTART_FILE.exists():
266+
_prev = json.loads(RESTART_FILE.read_text(encoding="utf-8"))
267+
_orig = _prev.get("deferred_task")
268+
if _orig:
269+
restart_data["deferred_task"] = _orig
270+
except Exception:
271+
pass
262272
try:
263273
RESTART_FILE.parent.mkdir(parents=True, exist_ok=True)
264274
RESTART_FILE.write_text(json.dumps(restart_data), encoding="utf-8")

operator_use/cli/start.py

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ def copy_templates_to_workspace(user_data_dir: Path, workspace: Path) -> None:
361361
shutil.copy2(f, dest_file)
362362

363363
async def _build_recovery_message(
364-
original_task: str,
364+
deferred_task: str,
365365
improvement_session: str,
366366
startup_error: str,
367367
run_id: str,
@@ -395,7 +395,7 @@ async def _build_recovery_message(
395395
f"**Error:**\n```\n{startup_error[-1500:]}\n```\n\n"
396396
f"**Changes you made (diffs):**\n{diff_text}\n\n"
397397
f"**Previous attempt history:**\n{log_text}\n\n"
398-
f"**Original task:**\n{original_task}\n\n"
398+
f"**Deferred task:**\n{deferred_task}\n\n"
399399
f"Analyze the error, determine the root cause, and try a corrected approach."
400400
)
401401

@@ -424,7 +424,7 @@ async def _build_recovery_message(
424424
f"**Analysis:**\n{synthesis}\n\n"
425425
f"**Full error:**\n```\n{startup_error[-1000:]}\n```\n\n"
426426
f"**Diffs:**\n{diff_text}\n\n"
427-
f"**Original task:**\n{original_task}\n\n"
427+
f"**Deferred task:**\n{deferred_task}\n\n"
428428
f"Files have been reverted. Please try a corrected approach."
429429
)
430430
except Exception as exc:
@@ -453,17 +453,17 @@ async def main():
453453
copy_templates_to_workspace(USERDATA_DIR, workspace=_resolve_agent_workspace(defn))
454454

455455
bus = Bus()
456-
_startup_ok_file = USERDATA_DIR / "startup_ok.json"
456+
_restart_file = USERDATA_DIR / "restart.json"
457457

458458
def _on_gateway_ready() -> None:
459459
"""Called by the gateway the moment all channels are live.
460-
Writing this file is the probe-passed signal — if the worker
461-
exits before it appears, the supervisor knows startup failed."""
460+
Deleting restart.json is the startup-ok signal — its presence after
461+
worker exit means the gateway never came up (startup failure)."""
462462
try:
463-
_startup_ok_file.write_text("{}", encoding="utf-8")
464-
logger.info("startup_ok.json written — probe passed")
463+
_restart_file.unlink(missing_ok=True)
464+
logger.info("restart.json deleted — startup probe passed")
465465
except Exception as exc:
466-
logger.warning("Could not write startup_ok.json: %s", exc)
466+
logger.warning("Could not delete restart.json in on_ready: %s", exc)
467467

468468
gateway = Gateway(bus=bus, on_ready=_on_gateway_ready)
469469

@@ -704,36 +704,39 @@ async def on_heartbeat(content: str) -> None:
704704
if restart_file.exists():
705705
import json as _json
706706
restart_data = _json.loads(restart_file.read_text(encoding="utf-8"))
707-
restart_file.unlink()
708-
task = restart_data.get("task", "")
707+
resume_task = restart_data.get("resume_task", "")
709708
resume_channel = restart_data.get("channel")
710709
resume_chat_id = restart_data.get("chat_id")
711710
resume_account_id = restart_data.get("account_id", "")
712711
improvement_session = restart_data.get("improvement_session")
713712
startup_error = restart_data.get("startup_error")
714-
original_task = restart_data.get("original_task", task)
713+
deferred_task = restart_data.get("deferred_task", resume_task)
715714
run_id = restart_data.get("run_id")
716715
# Inject run_id into all agents so control_center can carry it
717716
# forward into the next restart.json if the agent retries.
718717
if run_id:
719718
for _agent in agents.values():
720719
_agent.tool_register.set_extension("_run_id", run_id)
721-
print(f"[restart] Continuation found (channel={resume_channel} chat_id={resume_chat_id}): {task[:80]}", flush=True)
722-
if task and resume_channel and resume_chat_id:
720+
print(f"[restart] Continuation found (channel={resume_channel} chat_id={resume_chat_id}): {resume_task[:80]}", flush=True)
721+
if resume_task and resume_channel and resume_chat_id:
723722
async def _dispatch_continuation():
724723
await asyncio.sleep(10)
725-
final_task = task
724+
final_task = resume_task
726725

727726
# Recovery path: build an informative message for the agent.
728727
if improvement_session and startup_error and run_id:
729728
final_task = await _build_recovery_message(
730-
original_task=original_task,
729+
deferred_task=deferred_task,
731730
improvement_session=improvement_session,
732731
startup_error=startup_error,
733732
run_id=run_id,
734733
agents=agents,
735734
userdata=USERDATA_DIR,
736735
)
736+
elif run_id and not startup_error and deferred_task and deferred_task != resume_task:
737+
# Successful restart after one or more recovery cycles —
738+
# resume the deferred task now that the fix succeeded.
739+
final_task = deferred_task
737740

738741
print(f"[restart] Dispatching continuation to channel={resume_channel} chat_id={resume_chat_id}", flush=True)
739742
await bus.publish_incoming(
@@ -794,17 +797,12 @@ def _attempt_startup_recovery(exit_code: int) -> bool:
794797

795798
userdata = _gud()
796799
restart_file = userdata / "restart.json"
797-
startup_ok_file = userdata / "startup_ok.json"
798800
error_file = userdata / "startup_error.json"
799801

802+
# If restart.json is absent, on_ready already deleted it — worker started fine.
800803
if not restart_file.exists():
801804
return False
802805

803-
# If startup_ok.json exists the worker proved it could start — the crash
804-
# happened during normal runtime, not during startup. Don't revert.
805-
if startup_ok_file.exists():
806-
return False
807-
808806
try:
809807
restart_data = _json.loads(restart_file.read_text(encoding="utf-8"))
810808
except Exception:
@@ -842,15 +840,16 @@ def _attempt_startup_recovery(exit_code: int) -> bool:
842840

843841
# Determine run_id: carry forward the existing one if this is a retry,
844842
# otherwise generate a fresh one to mark the start of a new failure run.
845-
original_task = restart_data.get("task", "")
843+
# Preserve deferred_task across retries — first cycle seeds it from resume_task.
844+
deferred_task = restart_data.get("deferred_task") or restart_data.get("resume_task", "")
846845
run_id: str = restart_data.get("run_id") or f"R-{__import__('datetime').datetime.now().strftime('%Y%m%dT%H%M%S')}"
847846
print(f"[supervisor] run_id={run_id}", flush=True)
848847

849848
# Append to consecutive failure log.
850849
try:
851850
InterceptorLog(userdata).append(
852851
run_id=run_id,
853-
task_preview=original_task,
852+
task_preview=deferred_task,
854853
session_id=improvement_session,
855854
files_changed=reverted_files,
856855
error_preview=error_text,
@@ -861,7 +860,7 @@ def _attempt_startup_recovery(exit_code: int) -> bool:
861860

862861
# Rewrite restart.json — keep improvement_session, run_id, and raw error for
863862
# the LLM synthesis step that runs inside the clean worker's startup.
864-
restart_data["original_task"] = original_task
863+
restart_data["deferred_task"] = deferred_task
865864
restart_data["startup_error"] = error_text[-3000:]
866865
restart_data["run_id"] = run_id
867866
# task field left as-is so channel/chat_id dispatch still works
@@ -890,11 +889,6 @@ def run(verbose: bool = False) -> None:
890889
import sys
891890

892891
if os.getenv("IS_WORKER"):
893-
# Delete any startup proof left by the previous worker so this worker
894-
# must earn its own — the on_ready callback writes it fresh once the
895-
# gateway is live.
896-
from operator_use.paths import get_userdata_dir as _gud
897-
(_gud() / "startup_ok.json").unlink(missing_ok=True)
898892
from operator_use.agent.tools.builtin.control_center import requested_exit_code
899893
try:
900894
asyncio.run(main())

0 commit comments

Comments
 (0)