Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,21 @@ func runStart(cmd *cobra.Command, args []string) error {
log.Printf("Image save directory: %s", cfg.SaveDir)
}

// Start default agent initialization in background so monitors can start immediately
// Start default agent initialization in background so monitors can start immediately.
// Capture the configured name now so later config mutations do not register the
// started agent under the wrong key.
initialDefaultAgent := cfg.DefaultAgent
go func() {
if cfg.DefaultAgent == "" {
if initialDefaultAgent == "" {
log.Println("No default agent configured, staying in echo mode")
return
}
log.Printf("Initializing default agent %q in background...", cfg.DefaultAgent)
ag := createAgentByName(ctx, cfg, cfg.DefaultAgent)
log.Printf("Initializing default agent %q in background...", initialDefaultAgent)
ag := createAgentByName(ctx, cfg, initialDefaultAgent)
if ag == nil {
log.Printf("Failed to initialize default agent %q, staying in echo mode", cfg.DefaultAgent)
log.Printf("Failed to initialize default agent %q, staying in echo mode", initialDefaultAgent)
} else {
handler.SetDefaultAgent(cfg.DefaultAgent, ag)
handler.SetDefaultAgent(initialDefaultAgent, ag)
}
}()

Expand Down
35 changes: 27 additions & 8 deletions messaging/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ type Handler struct {
customAliases map[string]string // custom alias -> agent name (from config)
factory AgentFactory
saveDefault SaveDefaultFunc
contextTokens sync.Map // map[userID]contextToken
saveDir string // directory to save images/files to
seenMsgs sync.Map // map[int64]time.Time — dedup by message_id
contextTokens sync.Map // map[userID]contextToken
saveDir string // directory to save images/files to
seenMsgs sync.Map // map[int64]time.Time — dedup by message_id
}

// NewHandler creates a new message handler.
Expand Down Expand Up @@ -95,13 +95,21 @@ func (h *Handler) SetAgentWorkDirs(workDirs map[string]string) {
}
}

// SetDefaultAgent sets the default agent (already started).
// SetDefaultAgent registers an already-started agent. It only takes over as
// the default when no default is set yet, or when the same agent name is still
// the current default. This avoids background startup races from clobbering a
// newer user-selected default.
func (h *Handler) SetDefaultAgent(name string, ag agent.Agent) {
h.mu.Lock()
defer h.mu.Unlock()
h.defaultName = name
h.agents[name] = ag
log.Printf("[handler] default agent ready: %s (%s)", name, ag.Info())
if h.defaultName == "" || h.defaultName == name {
h.defaultName = name
log.Printf("[handler] default agent ready: %s (%s)", name, ag.Info())
return
}

log.Printf("[handler] agent ready without switching default: started=%s current_default=%s (%s)", name, h.defaultName, ag.Info())
}

// getAgent returns a running agent by name, or starts it on demand via factory.
Expand Down Expand Up @@ -344,7 +352,7 @@ func (h *Handler) HandleMessage(ctx context.Context, client *ilink.Client, msg i
}
return
} else if strings.HasPrefix(trimmed, "/cwd") {
reply := h.handleCwd(trimmed)
reply := h.handleCwd(ctx, msg.FromUserID, trimmed)
if err := SendTextReply(ctx, client, msg.FromUserID, reply, msg.ContextToken, clientID); err != nil {
log.Printf("[handler] failed to send reply to %s: %v", msg.FromUserID, err)
}
Expand Down Expand Up @@ -603,7 +611,7 @@ func (h *Handler) resetDefaultSession(ctx context.Context, userID string) string
}

// handleCwd handles the /cwd command. It updates the working directory for all running agents.
func (h *Handler) handleCwd(trimmed string) string {
func (h *Handler) handleCwd(ctx context.Context, userID string, trimmed string) string {
arg := strings.TrimSpace(strings.TrimPrefix(trimmed, "/cwd"))
if arg == "" {
// No path provided — show current cwd of default agent
Expand Down Expand Up @@ -654,9 +662,20 @@ func (h *Handler) handleCwd(trimmed string) string {
for name, ag := range agents {
ag.SetCwd(absPath)
log.Printf("[handler] updated cwd for agent %s: %s", name, absPath)
if userID == "" {
continue
}
resetCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
if _, err := ag.ResetSession(resetCtx, userID); err != nil {
log.Printf("[handler] failed to reset session after cwd update for agent %s: %v", name, err)
}
cancel()
}

h.mu.Lock()
if h.agentWorkDirs == nil {
h.agentWorkDirs = make(map[string]string, len(agents))
}
for name := range agents {
h.agentWorkDirs[name] = absPath
}
Expand Down
68 changes: 68 additions & 0 deletions messaging/handler_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,40 @@
package messaging

import (
"context"
"strings"
"testing"

"github.com/fastclaw-ai/weclaw/agent"
)

type fakeAgent struct {
resetConversationIDs []string
lastCwd string
infoName string
}

func (f *fakeAgent) Chat(context.Context, string, string) (string, error) {
return "", nil
}

func (f *fakeAgent) ResetSession(_ context.Context, conversationID string) (string, error) {
f.resetConversationIDs = append(f.resetConversationIDs, conversationID)
return "reset-session", nil
}

func (f *fakeAgent) Info() agent.AgentInfo {
name := f.infoName
if name == "" {
name = "fake"
}
return agent.AgentInfo{Name: name}
}

func (f *fakeAgent) SetCwd(cwd string) {
f.lastCwd = cwd
}

func newTestHandler() *Handler {
return &Handler{agents: make(map[string]agent.Agent)}
}
Expand Down Expand Up @@ -138,3 +166,43 @@ func TestBuildHelpText(t *testing.T) {
t.Error("help text should mention /help")
}
}

func TestHandleCwdResetsRunningAgentSession(t *testing.T) {
tempDir := t.TempDir()
h := newTestHandler()
ag := &fakeAgent{}
h.agents["copilot"] = ag

reply := h.handleCwd(context.Background(), "wechat-user", "/cwd "+tempDir)

if reply != "cwd: "+tempDir {
t.Fatalf("handleCwd() reply = %q, want %q", reply, "cwd: "+tempDir)
}
if ag.lastCwd != tempDir {
t.Fatalf("agent cwd = %q, want %q", ag.lastCwd, tempDir)
}
if len(ag.resetConversationIDs) != 1 || ag.resetConversationIDs[0] != "wechat-user" {
t.Fatalf("reset conversation IDs = %v, want [wechat-user]", ag.resetConversationIDs)
}
}

func TestSetDefaultAgentDoesNotClobberNewerDefault(t *testing.T) {
h := newTestHandler()
codex := &fakeAgent{infoName: "codex"}
copilot := &fakeAgent{infoName: "copilot"}

h.defaultName = "codex"
h.agents["codex"] = codex

h.SetDefaultAgent("copilot", copilot)

if h.defaultName != "codex" {
t.Fatalf("defaultName = %q, want %q", h.defaultName, "codex")
}
if got := h.agents["codex"]; got != codex {
t.Fatalf("codex agent overwritten: got %#v, want %#v", got, codex)
}
if got := h.agents["copilot"]; got != copilot {
t.Fatalf("copilot agent not registered: got %#v, want %#v", got, copilot)
}
}
63 changes: 63 additions & 0 deletions messaging/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,25 @@ import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/fastclaw-ai/weclaw/ilink"
"github.com/google/uuid"
)

const typingTicketTTL = 2 * time.Minute

type typingTicketCacheEntry struct {
ticket string
expiresAt time.Time
}

var (
typingTicketCache sync.Map
nowFunc = time.Now
)
Comment on lines +14 to +24
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typingTicketCache is an unbounded global cache keyed by userID, and expired entries are only removed on a subsequent read for that same user (getCachedTypingTicket deletes on access). If many unique userIDs interact once, this can grow indefinitely and retain expired tickets forever. Consider adding bounded eviction and/or periodic cleanup (e.g., opportunistic Range cleanup every N stores, or a small TTL cache with max size) so memory usage can’t grow without limit.

Copilot uses AI. Check for mistakes.

// NewClientID generates a new unique client ID for message correlation.
func NewClientID() string {
return uuid.New().String()
Expand All @@ -17,11 +31,22 @@ func NewClientID() string {
// SendTypingState sends a typing indicator to a user via the iLink sendtyping API.
// It first fetches a typing_ticket via getconfig, then sends the typing status.
func SendTypingState(ctx context.Context, client *ilink.Client, userID, contextToken string) error {
if cachedTicket := getCachedTypingTicket(userID); cachedTicket != "" {
if err := client.SendTyping(ctx, userID, cachedTicket, ilink.TypingStatusTyping); err == nil {
log.Printf("[sender] sent typing indicator to %s", userID)
return nil
}
invalidateCachedTypingTicket(userID)
}

// Get typing ticket
configResp, err := client.GetConfig(ctx, userID, contextToken)
if err != nil {
return fmt.Errorf("get config for typing: %w", err)
}
if configResp.Ret != 0 {
return fmt.Errorf("get config failed: ret=%d errmsg=%s", configResp.Ret, configResp.ErrMsg)
}
if configResp.TypingTicket == "" {
return fmt.Errorf("no typing_ticket returned from getconfig")
}
Expand All @@ -30,11 +55,49 @@ func SendTypingState(ctx context.Context, client *ilink.Client, userID, contextT
if err := client.SendTyping(ctx, userID, configResp.TypingTicket, ilink.TypingStatusTyping); err != nil {
return fmt.Errorf("send typing: %w", err)
}
cacheTypingTicket(userID, configResp.TypingTicket)

log.Printf("[sender] sent typing indicator to %s", userID)
return nil
}

func getCachedTypingTicket(userID string) string {
if userID == "" {
return ""
}

v, ok := typingTicketCache.Load(userID)
if !ok {
return ""
}

entry, ok := v.(typingTicketCacheEntry)
if !ok || entry.ticket == "" || !nowFunc().Before(entry.expiresAt) {
typingTicketCache.Delete(userID)
return ""
}

return entry.ticket
}

func cacheTypingTicket(userID, ticket string) {
if userID == "" || ticket == "" {
return
}

typingTicketCache.Store(userID, typingTicketCacheEntry{
ticket: ticket,
expiresAt: nowFunc().Add(typingTicketTTL),
})
}

func invalidateCachedTypingTicket(userID string) {
if userID == "" {
return
}
typingTicketCache.Delete(userID)
}

// SendTextReply sends a text reply to a user through the iLink API.
// If clientID is empty, a new one is generated.
func SendTextReply(ctx context.Context, client *ilink.Client, toUserID, text, contextToken, clientID string) error {
Expand Down
59 changes: 59 additions & 0 deletions messaging/sender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package messaging

import (
"testing"
"time"
)

func clearTypingTicketCache() {
typingTicketCache.Range(func(key, _ any) bool {
typingTicketCache.Delete(key)
return true
})
}

func TestTypingTicketCacheHit(t *testing.T) {
clearTypingTicketCache()
defer clearTypingTicketCache()

originalNow := nowFunc
defer func() { nowFunc = originalNow }()

baseTime := time.Date(2026, 4, 7, 2, 0, 0, 0, time.UTC)
nowFunc = func() time.Time { return baseTime }

cacheTypingTicket("user-1", "ticket-1")

if got := getCachedTypingTicket("user-1"); got != "ticket-1" {
t.Fatalf("getCachedTypingTicket() = %q, want %q", got, "ticket-1")
}
}

func TestTypingTicketCacheExpires(t *testing.T) {
clearTypingTicketCache()
defer clearTypingTicketCache()

originalNow := nowFunc
defer func() { nowFunc = originalNow }()

baseTime := time.Date(2026, 4, 7, 2, 0, 0, 0, time.UTC)
nowFunc = func() time.Time { return baseTime }
cacheTypingTicket("user-2", "ticket-2")

nowFunc = func() time.Time { return baseTime.Add(typingTicketTTL + time.Second) }
if got := getCachedTypingTicket("user-2"); got != "" {
t.Fatalf("getCachedTypingTicket() after expiry = %q, want empty string", got)
}
}

func TestInvalidateTypingTicketCache(t *testing.T) {
clearTypingTicketCache()
defer clearTypingTicketCache()

cacheTypingTicket("user-3", "ticket-3")
invalidateCachedTypingTicket("user-3")

if got := getCachedTypingTicket("user-3"); got != "" {
t.Fatalf("getCachedTypingTicket() after invalidate = %q, want empty string", got)
}
}