Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions backend/internal/cdc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
type EventType string

const (
EventSessionCreated EventType = "session_created"
EventSessionUpdated EventType = "session_updated"
EventPRCreated EventType = "pr_created"
EventPRUpdated EventType = "pr_updated"
EventPRCheckRecorded EventType = "pr_check_recorded"
EventSessionCreated EventType = "session_created"
EventSessionUpdated EventType = "session_updated"
EventPRCreated EventType = "pr_created"
EventPRUpdated EventType = "pr_updated"
EventPRCheckRecorded EventType = "pr_check_recorded"
EventNotificationCreated EventType = "notification_created"
EventNotificationUpdated EventType = "notification_updated"
)

// Event is one CDC change read from change_log. Seq is the monotonic ordering +
Expand Down
44 changes: 44 additions & 0 deletions backend/internal/domain/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package domain

import (
"encoding/json"
"time"
)

// NotificationID is the stable public identifier for a persisted notification.
type NotificationID string

// Notification is the provider-neutral durable notification read model. It is
// sink-agnostic: desktop, dashboard, Slack, webhooks, etc. all consume the same
// semantic payload and actions later.
type Notification struct {
Seq int64
ID NotificationID
ProjectID ProjectID
SessionID SessionID
Source string
EventType string
SemanticType string
Priority string
Message string
Payload json.RawMessage
Actions []NotificationAction
DedupeKey string
CauseKey string
ReadAt time.Time
ArchivedAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
}

// NotificationAction is a provider-neutral action descriptor. Renderers may use
// Route for app-local navigation, URL for external navigation, or Method for a
// future command/action endpoint.
type NotificationAction struct {
ID string `json:"id"`
Kind string `json:"kind"`
Label string `json:"label"`
Route string `json:"route,omitempty"`
URL string `json:"url,omitempty"`
Method string `json:"method,omitempty"`
}
139 changes: 139 additions & 0 deletions backend/internal/integration/lifecycle_sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package integration

import (
"context"
"io"
"log/slog"
"path/filepath"
"strings"
"sync"
Expand All @@ -16,6 +18,7 @@ import (
"github.com/aoagents/agent-orchestrator/backend/internal/cdc"
"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/lifecycle"
"github.com/aoagents/agent-orchestrator/backend/internal/notification"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
"github.com/aoagents/agent-orchestrator/backend/internal/session"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
Expand Down Expand Up @@ -268,6 +271,34 @@ func seedProject(t *testing.T, store *sqlite.Store, id string) {
}
}

func durableLifecycle(store *sqlite.Store, messenger ports.AgentMessenger) *lifecycle.Manager {
adapter := storeAdapter{store}
renderer := notification.NewRenderer(store)
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
notifier := notification.NewEnqueuer(store, renderer, logger)
return lifecycle.New(adapter, adapter, notifier, messenger)
}

func durableRecord(project, issue, branch string) domain.SessionRecord {
now := time.Now().UTC().Truncate(time.Second)
return domain.SessionRecord{
ProjectID: domain.ProjectID(project),
IssueID: domain.IssueID(issue),
Kind: domain.KindWorker,
Lifecycle: domain.CanonicalSessionLifecycle{
Version: domain.LifecycleVersion,
Session: domain.SessionSubstate{State: domain.SessionWorking},
IsAlive: true,
Activity: domain.ActivitySubstate{
State: domain.ActivityActive, LastActivityAt: now, Source: domain.SourceHook,
},
},
Metadata: domain.SessionMetadata{Branch: branch, WorkspacePath: "/workspace/" + branch},
CreatedAt: now,
UpdatedAt: now,
}
}

// ---- tests ----

// TestHappyPath drives Spawn -> SCM PR observation (open + CI passing) -> Kill,
Expand Down Expand Up @@ -653,6 +684,114 @@ func TestCDCPollerReceivesAllStages(t *testing.T) {
}
}

func TestLifecycleDurableNotification_NeedsInput(t *testing.T) {
t.Parallel()
ctx := context.Background()
store, err := sqlite.Open(t.TempDir())
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
defer store.Close()
seedProject(t, store, "mer")
rec, err := store.CreateSession(ctx, durableRecord("mer", "MER-1", "feat/input"))
if err != nil {
t.Fatalf("create session: %v", err)
}
lcm := durableLifecycle(store, &captureMessenger{})
startSeq, _ := store.MaxChangeLogSeq(ctx)

if err := lcm.ApplyActivitySignal(ctx, rec.ID, ports.ActivitySignal{
Valid: true, State: domain.ActivityWaitingInput, Source: domain.SourceHook, Timestamp: time.Now(),
}); err != nil {
t.Fatalf("activity: %v", err)
}

notifications, err := store.ListNotifications(ctx, sqlite.NotificationFilter{SessionID: string(rec.ID), Limit: 10})
if err != nil {
t.Fatalf("list notifications: %v", err)
}
if len(notifications) != 1 || notifications[0].SemanticType != "session.needs_input" || notifications[0].DedupeKey == "" {
t.Fatalf("needs_input notification missing: %+v", notifications)
}
assertNotificationCreatedCDC(t, store, startSeq)
}

func TestLifecycleDurableNotification_ApprovedAndGreen(t *testing.T) {
t.Parallel()
ctx := context.Background()
store, err := sqlite.Open(t.TempDir())
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
defer store.Close()
seedProject(t, store, "mer")
rec, err := store.CreateSession(ctx, durableRecord("mer", "MER-2", "feat/green"))
if err != nil {
t.Fatalf("create session: %v", err)
}
lcm := durableLifecycle(store, &captureMessenger{})

if err := lcm.ApplyPRObservation(ctx, rec.ID, ports.PRObservation{
Fetched: true, URL: "https://github.com/org/repo/pull/2", Number: 2,
CI: domain.CIPassing, Review: domain.ReviewApproved, Mergeability: domain.MergeMergeable,
}); err != nil {
t.Fatalf("apply pr: %v", err)
}
notifications, err := store.ListNotifications(ctx, sqlite.NotificationFilter{SessionID: string(rec.ID), Limit: 10})
if err != nil {
t.Fatalf("list notifications: %v", err)
}
if len(notifications) != 1 || notifications[0].SemanticType != "merge.ready" {
t.Fatalf("approved-and-green notification missing: %+v", notifications)
}
}

func TestLifecycleDurableNotification_PRMerged(t *testing.T) {
t.Parallel()
ctx := context.Background()
store, err := sqlite.Open(t.TempDir())
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
defer store.Close()
seedProject(t, store, "mer")
rec, err := store.CreateSession(ctx, durableRecord("mer", "MER-3", "feat/merge"))
if err != nil {
t.Fatalf("create session: %v", err)
}
lcm := durableLifecycle(store, &captureMessenger{})
startSeq, _ := store.MaxChangeLogSeq(ctx)

if err := lcm.ApplyPRObservation(ctx, rec.ID, ports.PRObservation{
Fetched: true, URL: "https://github.com/org/repo/pull/3", Number: 3, Merged: true,
CI: domain.CIPassing, Review: domain.ReviewApproved, Mergeability: domain.MergeMergeable,
}); err != nil {
t.Fatalf("apply pr: %v", err)
}
notifications, err := store.ListNotifications(ctx, sqlite.NotificationFilter{SessionID: string(rec.ID), Limit: 10})
if err != nil {
t.Fatalf("list notifications: %v", err)
}
if len(notifications) != 1 || notifications[0].SemanticType != "pr.merged" {
t.Fatalf("pr_merged notification missing: %+v", notifications)
}
assertNotificationCreatedCDC(t, store, startSeq)
}

func assertNotificationCreatedCDC(t *testing.T, store *sqlite.Store, after int64) {
t.Helper()
evs, err := store.ReadChangeLogAfter(context.Background(), after, 20)
if err != nil {
t.Fatalf("read change_log: %v", err)
}
for _, e := range evs {
if e.EventType == string(cdc.EventNotificationCreated) {
return
}
}
t.Fatalf("missing notification_created CDC after %d: %+v", after, evs)
}

// ---- small helpers ----

type pollerSource struct{ *sqlite.Store }
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (m *Manager) ApplyPRObservation(ctx context.Context, id domain.SessionID, o
}
if changed {
m.clearReactions(id)
return m.fireNotify(ctx, id, rec.ProjectID, reactions[rxMerged])
return m.fireNotify(ctx, id, rec.ProjectID, rxMerged, reactions[rxMerged])
}
return nil
}
Expand Down
36 changes: 26 additions & 10 deletions backend/internal/lifecycle/reactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (m *Manager) dispatch(ctx context.Context, id domain.SessionID, project dom
if cfg.toAgent {
return m.fireAgentEntry(ctx, id, project, key, cfg)
}
return m.fireNotify(ctx, id, project, cfg)
return m.fireNotify(ctx, id, project, key, cfg)
}

// reactionFor maps (session state, PR facts) to the reaction to enter. CI failure
Expand Down Expand Up @@ -312,7 +312,11 @@ func (m *Manager) fireFeedback(ctx context.Context, id domain.SessionID, project
m.react.mu.Lock()
t.escalated = true
m.react.mu.Unlock()
return m.escalate(ctx, id, pid, key)
cause := "max_attempts"
if key == rxCIFailed {
cause = "max_retries"
}
return m.escalate(ctx, id, pid, key, ports.EscalationEvent{Attempts: attempts, Cause: cause})
}
return m.messenger.Send(ctx, id, message)
}
Expand All @@ -339,28 +343,40 @@ func (m *Manager) fireAgentEntry(ctx context.Context, id domain.SessionID, proje
return m.messenger.Send(ctx, id, cfg.message)
}

func (m *Manager) fireNotify(ctx context.Context, id domain.SessionID, project domain.ProjectID, cfg reactionConfig) error {
func (m *Manager) fireNotify(ctx context.Context, id domain.SessionID, project domain.ProjectID, key reactionKey, cfg reactionConfig) error {
return m.notifier.Notify(ctx, ports.Event{
Type: cfg.eventType, Priority: cfg.priority,
SessionID: id, ProjectID: project, Message: cfg.message,
Reaction: &ports.ReactionEvent{Key: string(key), Action: "notify"},
CauseKey: string(key),
OccurredAt: m.clock(),
})
}

func (m *Manager) escalate(ctx context.Context, id domain.SessionID, project domain.ProjectID, key reactionKey) error {
func (m *Manager) escalate(ctx context.Context, id domain.SessionID, project domain.ProjectID, key reactionKey, esc ports.EscalationEvent) error {
if esc.Cause == "" {
esc.Cause = "max_attempts"
}
return m.notifier.Notify(ctx, ports.Event{
Type: "reaction.escalated", Priority: ports.PriorityUrgent,
SessionID: id, ProjectID: project,
Message: fmt.Sprintf("Automatic handling of %q is exhausted — needs a human.", key),
Message: fmt.Sprintf("Automatic handling of %q is exhausted — needs a human.", key),
Reaction: &ports.ReactionEvent{Key: string(key), Action: "escalated"},
Escalation: &esc,
CauseKey: string(key) + ":" + esc.Cause,
OccurredAt: m.clock(),
})
}

// TickEscalations fires the duration-based escalations the synchronous engine
// cannot wake itself for. The reaper calls it on a timer.
func (m *Manager) TickEscalations(ctx context.Context, now time.Time) error {
type due struct {
id domain.SessionID
project domain.ProjectID
key reactionKey
id domain.SessionID
project domain.ProjectID
key reactionKey
attempts int
durationMs int64
}
var fire []due
m.react.mu.Lock()
Expand All @@ -371,13 +387,13 @@ func (m *Manager) TickEscalations(ctx context.Context, now time.Time) error {
cfg := reactions[k.key]
if cfg.escalateAfter > 0 && !t.firstAt.IsZero() && now.Sub(t.firstAt) >= cfg.escalateAfter {
t.escalated = true
fire = append(fire, due{k.id, t.projectID, k.key})
fire = append(fire, due{k.id, t.projectID, k.key, t.attempts, now.Sub(t.firstAt).Milliseconds()})
}
}
m.react.mu.Unlock()

for _, d := range fire {
if err := m.escalate(ctx, d.id, d.project, d.key); err != nil {
if err := m.escalate(ctx, d.id, d.project, d.key, ports.EscalationEvent{Attempts: d.attempts, Cause: "max_duration", DurationMs: d.durationMs}); err != nil {
return err
}
}
Expand Down
74 changes: 74 additions & 0 deletions backend/internal/notification/dedupe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package notification

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

// ConditionHash returns a deterministic, compact hash over a condition vector.
func ConditionHash(parts ...string) string {
b, _ := json.Marshal(parts)
sum := sha256.Sum256(b)
return hex.EncodeToString(sum[:16])
}

// DedupeKey returns the stable durable notification idempotency key.
func DedupeKey(projectID domain.ProjectID, sessionID domain.SessionID, reactionKey, conditionHash string) string {
return fmt.Sprintf("v1:lifecycle:%s:%s:%s:%s", projectID, sessionID, reactionKey, conditionHash)
}

// ComputeDedupeKey derives a restart-safe dedupe key from the lifecycle event
// plus current persisted state. It avoids PR updated_at because re-polling the
// same facts after daemon restart would otherwise create duplicate notifications.
func ComputeDedupeKey(event ports.Event, rec domain.SessionRecord, pr domain.PRFacts) string {
projectID := event.ProjectID
if projectID == "" {
projectID = rec.ProjectID
}
reactionKey := reactionKeyForEvent(event)
condition := []string{
"session_state", string(rec.Lifecycle.Session.State),
"termination", string(rec.Lifecycle.TerminationReason),
"session_updated", timeKey(rec.UpdatedAt),
}
if pr.Exists {
condition = append(condition,
"pr_url", pr.URL,
"pr_number", fmt.Sprint(pr.Number),
"pr_draft", fmt.Sprint(pr.Draft),
"pr_merged", fmt.Sprint(pr.Merged),
"pr_closed", fmt.Sprint(pr.Closed),
"ci", string(pr.CI),
"review", string(pr.Review),
"mergeability", string(pr.Mergeability),
"review_comments", fmt.Sprint(pr.ReviewComments),
)
}
if event.CauseKey != "" {
condition = append(condition, "cause_key", event.CauseKey)
}
if event.Escalation != nil {
condition = append(condition, "escalation_cause", event.Escalation.Cause)
}
return DedupeKey(projectID, event.SessionID, reactionKey, ConditionHash(condition...))
}

func reactionKeyForEvent(event ports.Event) string {
if event.Reaction != nil && event.Reaction.Key != "" {
return event.Reaction.Key
}
return reactionKeyFromType(event.Type)
}

func timeKey(t time.Time) string {
if t.IsZero() {
return ""
}
return t.UTC().Format(time.RFC3339Nano)
}
Loading
Loading