Skip to content

Commit db97e65

Browse files
gcmsgclaude
andcommitted
feat: add OpenClaw gateway integration and notification handling
- Add NotificationPayload type and OnNotification callback to Agent - Extend SignalingClient interface with SetNotificationHandler - Handle MessageTypeNotification in signaling client readLoop - Create openclaw package with gateway WebSocket client, frame protocol types, chat send/inject methods, and format utilities - Wire OpenClaw integration in Agent.Start(): outbound handler sends AI responses back via P2P, notifications forwarded to OpenClaw conversations, incoming P2P messages routed to OpenClaw chat Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0afd9df commit db97e65

8 files changed

Lines changed: 670 additions & 3 deletions

File tree

agent.go

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import (
1313
"github.com/peerclaw/peerclaw-agent/conn"
1414
"github.com/peerclaw/peerclaw-agent/discovery"
1515
"github.com/peerclaw/peerclaw-agent/filetransfer"
16+
"github.com/peerclaw/peerclaw-agent/openclaw"
1617
"github.com/peerclaw/peerclaw-agent/peer"
18+
"github.com/peerclaw/peerclaw-agent/sdkversion"
1719
"github.com/peerclaw/peerclaw-agent/security"
1820
pcsignaling "github.com/peerclaw/peerclaw-agent/signaling"
1921
"github.com/peerclaw/peerclaw-agent/transport"
@@ -36,6 +38,19 @@ type ConnectionRequest struct {
3638
// Return true to allow, false to deny.
3739
type ConnectionRequestHandler func(ctx context.Context, req *ConnectionRequest) bool
3840

41+
// NotificationPayload represents a notification pushed from the server via signaling.
42+
type NotificationPayload struct {
43+
ID string `json:"id"`
44+
UserID string `json:"user_id"`
45+
AgentID string `json:"agent_id"`
46+
Type string `json:"type"`
47+
Severity string `json:"severity"`
48+
Title string `json:"title"`
49+
Body string `json:"body"`
50+
Metadata map[string]string `json:"metadata,omitempty"`
51+
CreatedAt time.Time `json:"created_at"`
52+
}
53+
3954
// Options configures an Agent.
4055
type Options struct {
4156
// Name is the agent's display name.
@@ -104,6 +119,11 @@ type Options struct {
104119
// (done, failed, or cancelled).
105120
OnFileTransferComplete func(info filetransfer.TransferInfo)
106121

122+
// OpenClaw holds optional OpenClaw gateway integration settings.
123+
// When set, the agent connects to OpenClaw and forwards P2P messages
124+
// and server notifications to OpenClaw conversations.
125+
OpenClaw *openclaw.Config
126+
107127
// SkipRegistration skips server registration on Start(). Use when the agent
108128
// is already registered and you only need P2P connectivity.
109129
SkipRegistration bool
@@ -132,8 +152,10 @@ type Agent struct {
132152
pendingRequests map[string]chan *envelope.Envelope // traceID → response channel
133153
taskTracker *TaskTracker
134154
router *Router
135-
handler MessageHandler
136-
connRequestHandler ConnectionRequestHandler
155+
handler MessageHandler
156+
connRequestHandler ConnectionRequestHandler
157+
notificationHandler func(n *NotificationPayload)
158+
openclawClient *openclaw.Client
137159
connManager *conn.Manager
138160
mailbox *transport.Mailbox
139161
fileTransfer *filetransfer.Manager
@@ -374,6 +396,21 @@ func (a *Agent) Start(ctx context.Context) error {
374396
a.HandleIncomingEnvelope(context.Background(), &env)
375397
})
376398

399+
// Set up notification handler for server notifications via signaling.
400+
a.signaling.SetNotificationHandler(func(payload []byte) {
401+
var n NotificationPayload
402+
if err := json.Unmarshal(payload, &n); err != nil {
403+
a.logger.Warn("invalid notification payload", "error", err)
404+
return
405+
}
406+
a.mu.RLock()
407+
handler := a.notificationHandler
408+
a.mu.RUnlock()
409+
if handler != nil {
410+
handler(&n)
411+
}
412+
})
413+
377414
// Start connection orchestrator with connection gate.
378415
x25519Pub, _ := a.keypair.X25519PublicKeyString()
379416
a.connManager = conn.New(conn.Config{
@@ -499,6 +536,44 @@ func (a *Agent) Start(ctx context.Context) error {
499536
}
500537
}
501538

539+
// Initialize OpenClaw gateway integration.
540+
if a.opts.OpenClaw != nil {
541+
oc := openclaw.NewClient(*a.opts.OpenClaw, a.agentID, a.opts.Name, sdkversion.Version, a.logger)
542+
oc.SetOutboundHandler(func(sessionKey, text string) {
543+
peerID := openclaw.ParsePeerFromSessionKey(sessionKey)
544+
if peerID == "" {
545+
return
546+
}
547+
env := envelope.New(a.agentID, peerID, "peerclaw", []byte(text))
548+
_ = a.Send(context.Background(), env)
549+
})
550+
if err := oc.Connect(ctx); err != nil {
551+
a.logger.Warn("openclaw connect failed", "error", err)
552+
} else {
553+
a.openclawClient = oc
554+
}
555+
556+
// Forward notifications to OpenClaw.
557+
if a.openclawClient != nil {
558+
a.OnNotification(func(n *NotificationPayload) {
559+
text := openclaw.FormatNotification(n.Severity, n.Title, n.Body)
560+
_ = a.openclawClient.ChatInject(ctx, "peerclaw:notifications", text, "peerclaw-notification")
561+
})
562+
}
563+
564+
// Forward P2P messages to OpenClaw.
565+
if a.openclawClient != nil {
566+
prevHandler := a.handler
567+
a.OnMessage(func(msgCtx context.Context, env *envelope.Envelope) {
568+
sessionKey := openclaw.SessionKeyForPeer(env.Source)
569+
_ = a.openclawClient.ChatSend(msgCtx, sessionKey, string(env.Payload))
570+
if prevHandler != nil {
571+
prevHandler(msgCtx, env)
572+
}
573+
})
574+
}
575+
}
576+
502577
a.logger.Info("agent started", "id", a.agentID, "name", a.opts.Name, "pubkey", a.keypair.PublicKeyString())
503578
return nil
504579
}
@@ -555,6 +630,11 @@ func (a *Agent) Stop(ctx context.Context) error {
555630
}
556631
}
557632

633+
// Close OpenClaw gateway.
634+
if a.openclawClient != nil {
635+
a.openclawClient.Close()
636+
}
637+
558638
// Close signaling and peers.
559639
a.signaling.Close()
560640
a.peerManager.Close()
@@ -727,6 +807,13 @@ func (a *Agent) OnMessage(handler MessageHandler) {
727807
a.handler = handler
728808
}
729809

810+
// OnNotification registers a handler for server notifications pushed via signaling.
811+
func (a *Agent) OnNotification(handler func(n *NotificationPayload)) {
812+
a.mu.Lock()
813+
defer a.mu.Unlock()
814+
a.notificationHandler = handler
815+
}
816+
730817
// Handle registers a capability handler on the router.
731818
func (a *Agent) Handle(capability string, handler HandlerFunc) {
732819
a.router.Handle(capability, handler)

conn/manager_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ func (m *mockSignalingClient) ICEServers() []pcsignaling.ICEServerConfig {
6464

6565
func (m *mockSignalingClient) SetBridgeHandler(_ signaling.BridgeMessageHandler) {}
6666

67+
func (m *mockSignalingClient) SetNotificationHandler(_ func(payload []byte)) {}
68+
6769
func (m *mockSignalingClient) SetAgentID(id string) {
6870
m.mu.Lock()
6971
defer m.mu.Unlock()

0 commit comments

Comments
 (0)