Skip to content

Commit e87c82e

Browse files
Jeomonclaude
andcommitted
Add /start, /stop, /restart session control commands across all channels
- /start: creates a new session or reports one is already active - /stop: archives the current session (timestamped JSONL) and frees the slot - /restart: replaces the process via os.execv for a full system reboot - Commands intercepted in Orchestrator before reaching the agent (no LLM call) - Command logic extracted to orchestrator/commands.py with COMMANDS set for future extensibility - SessionStore.archive() added to rename active session file with timestamp suffix - All channels (Telegram, Discord, Slack, Twitch) detect /start /stop /restart and set _command metadata - Telegram /new kept as alias for /stop; Twitch uses / prefix instead of ! - 31 tests added covering archive, all command flows, routing, and edge cases Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent ff8e0bf commit e87c82e

10 files changed

Lines changed: 464 additions & 16 deletions

File tree

operator_use/gateway/channels/discord.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,6 @@ async def _on_message(self, message: discord.Message) -> None:
290290
content = "\n".join(content_parts) if content_parts else "[empty message]"
291291
str_channel_id = str(channel_id)
292292

293-
self._start_typing(channel_id)
294-
295293
metadata = {
296294
"message_id": message.id,
297295
"user_id": author_id,
@@ -300,6 +298,22 @@ async def _on_message(self, message: discord.Message) -> None:
300298
"channel_type": type(message.channel).__name__,
301299
}
302300

301+
# Session control commands — no typing indicator, no agent call
302+
if not media_paths and content.strip().lower() in ("/start", "/stop", "/restart"):
303+
command = content.strip()[1:].lower()
304+
incoming = IncomingMessage(
305+
channel=self.name,
306+
chat_id=str_channel_id,
307+
parts=[TextPart(content=content.strip())],
308+
user_id=str(author_id),
309+
account_id=self._cfg("account_id") or "",
310+
metadata={**metadata, "_command": command},
311+
)
312+
await self.receive(incoming)
313+
return
314+
315+
self._start_typing(channel_id)
316+
303317
if media_paths:
304318
content, parts = await self._process_media_to_parts(
305319
content, media_paths, [self._get_media_type(p) for p in media_paths]

operator_use/gateway/channels/slack.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,20 @@ async def _handle_webhook_event(self, event: dict) -> None:
368368
"user_id": sender_id,
369369
}
370370

371+
# Session control commands
372+
if text and text.strip().lower() in ("/start", "/stop", "/restart"):
373+
command = text.strip()[1:].lower()
374+
incoming = IncomingMessage(
375+
channel=self.name,
376+
chat_id=channel_id,
377+
parts=[TextPart(content=text.strip())],
378+
user_id=sender_id,
379+
account_id=self._cfg("account_id") or "",
380+
metadata={**metadata, "_command": command},
381+
)
382+
await self.receive(incoming)
383+
return
384+
371385
# Create incoming message
372386
incoming = IncomingMessage(
373387
channel=self.name,
@@ -447,6 +461,20 @@ async def _on_socket_request(self, client, req: SocketModeRequest):
447461
"user_id": sender_id,
448462
}
449463

464+
# Session control commands
465+
if text and text.strip().lower() in ("/start", "/stop", "/restart"):
466+
command = text.strip()[1:].lower()
467+
incoming = IncomingMessage(
468+
channel=self.name,
469+
chat_id=channel_id,
470+
parts=[TextPart(content=text.strip())],
471+
user_id=sender_id,
472+
account_id=self._cfg("account_id") or "",
473+
metadata={**metadata, "_command": command},
474+
)
475+
await self.receive(incoming)
476+
return
477+
450478
incoming = IncomingMessage(
451479
channel=self.name,
452480
chat_id=channel_id,

operator_use/gateway/channels/telegram.py

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,9 @@ class TelegramChannel(BaseChannel):
158158
"""Telegram channel using long-polling or webhook."""
159159

160160
BOT_COMMANDS = [
161-
BotCommand("start", "Start the bot"),
162-
BotCommand("new", "Start a new conversation"),
163-
BotCommand("stop", "Stop the current task"),
161+
BotCommand("start", "Start a new session"),
162+
BotCommand("stop", "Stop and save the current session"),
163+
BotCommand("restart", "Reboot the entire system"),
164164
BotCommand("help", "Show available commands"),
165165
]
166166

@@ -280,7 +280,9 @@ async def _post_init(app):
280280
self._app = builder.build()
281281

282282
self._app.add_handler(CommandHandler("start", self._on_start))
283-
self._app.add_handler(CommandHandler("new", self._on_new))
283+
self._app.add_handler(CommandHandler("stop", self._on_stop))
284+
self._app.add_handler(CommandHandler("restart", self._on_restart))
285+
self._app.add_handler(CommandHandler("new", self._on_stop)) # /new kept as stop alias
284286
self._app.add_handler(MessageHandler(
285287
(
286288
filters.TEXT|
@@ -848,7 +850,7 @@ async def _typing_loop(self, chat_id: str) -> None:
848850
logger.debug("Typing indicator stopped for %s: %s", chat_id, e)
849851

850852
async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
851-
"""Handle /start — welcome in DM, register group for setup-channels."""
853+
"""Handle /start — register group for setup-channels in groups, start session in DMs."""
852854
if not update.message or not update.effective_chat:
853855
return
854856
chat = update.effective_chat
@@ -870,14 +872,35 @@ async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) ->
870872
except Exception as e:
871873
logger.error("Failed to register group: %s", e)
872874
else:
873-
user = update.effective_user
874-
name = user.first_name if user else "there"
875-
await update.message.reply_text(
876-
f"Hi {name}! I'm your AI agent. Send me a message to get started."
875+
thread_id = update.message.message_thread_id
876+
chat_id = self._build_chat_id(str(chat.id), thread_id)
877+
incoming = IncomingMessage(
878+
channel=self.name,
879+
chat_id=chat_id,
880+
parts=[TextPart(content="/start")],
881+
user_id=str(update.effective_user.id) if update.effective_user else "",
882+
metadata={"_command": "start", "thread_id": thread_id},
877883
)
884+
await self.receive(incoming)
885+
886+
async def _on_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
887+
"""Handle /stop — archive the current session."""
888+
if not update.message or not update.effective_chat:
889+
return
890+
chat = update.effective_chat
891+
thread_id = update.message.message_thread_id
892+
chat_id = self._build_chat_id(str(chat.id), thread_id)
893+
incoming = IncomingMessage(
894+
channel=self.name,
895+
chat_id=chat_id,
896+
parts=[TextPart(content="/stop")],
897+
user_id=str(update.effective_user.id) if update.effective_user else "",
898+
metadata={"_command": "stop", "thread_id": thread_id},
899+
)
900+
await self.receive(incoming)
878901

879-
async def _on_new(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
880-
"""Handle /new — clear session and start fresh."""
902+
async def _on_restart(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
903+
"""Handle /restart (and /new alias) — clear session history and start fresh."""
881904
if not update.message or not update.effective_chat:
882905
return
883906
chat = update.effective_chat
@@ -886,9 +909,9 @@ async def _on_new(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> N
886909
incoming = IncomingMessage(
887910
channel=self.name,
888911
chat_id=chat_id,
889-
parts=[TextPart(content="/new")],
912+
parts=[TextPart(content="/restart")],
890913
user_id=str(update.effective_user.id) if update.effective_user else "",
891-
metadata={"_command": "new", "thread_id": thread_id},
914+
metadata={"_command": "restart", "thread_id": thread_id},
892915
)
893916
await self.receive(incoming)
894917

operator_use/gateway/channels/twitch.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,21 @@ async def _on_message(self, message: twitchio.Message) -> None:
144144

145145
chat_id = channel_name # use channel name as chat_id (Twitch has one chat per channel)
146146

147+
# Session control commands always use / prefix (e.g. /start, /stop, /restart)
148+
lower_content = content.strip().lower()
149+
for cmd in ("start", "stop", "restart"):
150+
if lower_content == f"/{cmd}":
151+
incoming = IncomingMessage(
152+
channel=self.name,
153+
chat_id=chat_id,
154+
parts=[TextPart(content=content.strip())],
155+
user_id=author_name,
156+
account_id=self._cfg("account_id") or "",
157+
metadata={"username": author_name, "channel": channel_name, "_command": cmd},
158+
)
159+
await self.receive(incoming)
160+
return
161+
147162
incoming = IncomingMessage(
148163
channel=self.name,
149164
chat_id=chat_id,
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Orchestrator: pipeline layer between channels and agents."""
22

33
from operator_use.orchestrator.service import Orchestrator
4+
from operator_use.orchestrator.commands import COMMANDS
45

5-
__all__ = ["Orchestrator"]
6+
__all__ = ["Orchestrator", "COMMANDS"]
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""Session and system control commands (/start, /stop, /restart, ...)."""
2+
3+
import asyncio
4+
import logging
5+
import os
6+
import sys
7+
from typing import TYPE_CHECKING
8+
9+
from operator_use.bus.views import IncomingMessage, OutgoingMessage, TextPart
10+
11+
if TYPE_CHECKING:
12+
from operator_use.bus import Bus
13+
from operator_use.agent.service import Agent
14+
15+
logger = logging.getLogger(__name__)
16+
17+
# All recognised command names. Channels use this to detect commands.
18+
COMMANDS = {"start", "stop", "restart"}
19+
20+
21+
async def handle_command(
22+
message: IncomingMessage,
23+
agent: "Agent",
24+
bus: "Bus",
25+
) -> None:
26+
"""Dispatch a session/system control command and publish the response."""
27+
command = (message.metadata or {}).get("_command")
28+
session_id = f"{message.channel}:{message.chat_id}"
29+
30+
if command == "start":
31+
text = await _cmd_start(session_id, agent)
32+
elif command == "stop":
33+
text = await _cmd_stop(session_id, agent)
34+
elif command == "restart":
35+
text = await _cmd_restart()
36+
else:
37+
logger.warning("handle_command called with unknown command: %r", command)
38+
return
39+
40+
await bus.publish_outgoing(
41+
OutgoingMessage(
42+
chat_id=message.chat_id,
43+
channel=message.channel,
44+
account_id=message.account_id,
45+
parts=[TextPart(content=text)],
46+
metadata=message.metadata,
47+
reply=True,
48+
)
49+
)
50+
51+
if command == "restart":
52+
asyncio.create_task(_deferred_process_restart())
53+
54+
55+
# ---------------------------------------------------------------------------
56+
# Individual command implementations
57+
# ---------------------------------------------------------------------------
58+
59+
async def _cmd_start(session_id: str, agent: "Agent") -> str:
60+
existing = agent.sessions.load(session_id)
61+
if existing and existing.messages:
62+
return "Session is already active. Use /stop to end the session."
63+
return "Session started! Send me a message to get started."
64+
65+
66+
async def _cmd_stop(session_id: str, agent: "Agent") -> str:
67+
agent.sessions.archive(session_id)
68+
return "Session stopped and saved. Use /start to begin a new session."
69+
70+
71+
async def _cmd_restart() -> str:
72+
return "Restarting system..."
73+
74+
75+
async def _deferred_process_restart(delay: float = 1.5) -> None:
76+
"""Wait for the response to be delivered, then replace the process."""
77+
await asyncio.sleep(delay)
78+
logger.info("Restarting process: %s %s", sys.executable, sys.argv)
79+
os.execv(sys.executable, [sys.executable] + sys.argv)

operator_use/orchestrator/service.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,12 @@ async def _build_outgoing_message(
230230
# Message handling
231231
# ------------------------------------------------------------------
232232

233+
async def _handle_command(self, message: IncomingMessage) -> None:
234+
"""Handle session/system control commands without running the agent."""
235+
from operator_use.orchestrator.commands import handle_command
236+
agent = self._resolve_agent(message)
237+
await handle_command(message, agent, self.bus)
238+
233239
async def _handle_message(self, request_message: IncomingMessage) -> None:
234240
"""Process one incoming message end-to-end."""
235241
session_id = f"{request_message.channel}:{request_message.chat_id}"
@@ -376,6 +382,11 @@ async def ainvoke(self) -> None:
376382
await agent._handle_reaction(request_message)
377383
continue
378384

385+
# Session control commands: handle without running the agent
386+
if request_message.metadata and request_message.metadata.get("_command") in ("start", "stop", "restart"):
387+
await self._handle_command(request_message)
388+
continue
389+
379390
# Pending reply: a tool is waiting for the user's next message
380391
if session_id in self._pending_replies:
381392
future = self._pending_replies.pop(session_id)

operator_use/session/service.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,21 @@ def delete(self, session_id: str) -> bool:
9191
return True
9292
return False
9393

94+
def archive(self, session_id: str) -> bool:
95+
"""Archive a session by renaming its file with a timestamp suffix. Returns True if existed.
96+
97+
The active session slot is freed so the next get_or_create starts fresh.
98+
"""
99+
path = self._sessions_path(session_id)
100+
if session_id in self._sessions:
101+
del self._sessions[session_id]
102+
if path.exists():
103+
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
104+
archive_name = f"{self._session_id_to_filename(session_id)}_archived_{timestamp}.jsonl"
105+
path.rename(self.sessions_dir / archive_name)
106+
return True
107+
return False
108+
94109
def list_sessions(self) -> list[dict[str, Any]]:
95110
"""Load sessions from the sessions directory. Returns list of dicts with id, created_at, updated_at, path."""
96111
result: list[dict[str, Any]] = []

0 commit comments

Comments
 (0)