From 5303c51d29406d21fb6196373b5602483a2fb882 Mon Sep 17 00:00:00 2001 From: whoisasx Date: Mon, 1 Jun 2026 00:05:40 +0530 Subject: [PATCH] feat: add durable notification foundation --- backend/internal/cdc/event.go | 12 +- backend/internal/domain/notification.go | 44 ++ .../integration/lifecycle_sqlite_test.go | 139 ++++++ backend/internal/lifecycle/manager.go | 2 +- backend/internal/lifecycle/reactions.go | 36 +- backend/internal/notification/dedupe.go | 74 +++ backend/internal/notification/dedupe_test.go | 63 +++ backend/internal/notification/enqueuer.go | 50 ++ .../internal/notification/enqueuer_test.go | 38 ++ backend/internal/notification/payload.go | 65 +++ backend/internal/notification/renderer.go | 198 ++++++++ .../internal/notification/renderer_test.go | 133 +++++ backend/internal/ports/outbound.go | 38 +- backend/internal/storage/sqlite/db.go | 4 +- backend/internal/storage/sqlite/gen/models.go | 20 + .../storage/sqlite/gen/notifications.sql.go | 464 ++++++++++++++++++ .../internal/storage/sqlite/gen/querier.go | 10 + .../sqlite/migrations/0002_notifications.sql | 81 +++ .../storage/sqlite/notification_store.go | 242 +++++++++ .../storage/sqlite/notification_store_test.go | 232 +++++++++ backend/internal/storage/sqlite/pr_facts.go | 45 ++ .../storage/sqlite/queries/notifications.sql | 70 +++ backend/lifecycle_wiring.go | 16 +- backend/wiring_test.go | 6 +- 24 files changed, 2045 insertions(+), 37 deletions(-) create mode 100644 backend/internal/domain/notification.go create mode 100644 backend/internal/notification/dedupe.go create mode 100644 backend/internal/notification/dedupe_test.go create mode 100644 backend/internal/notification/enqueuer.go create mode 100644 backend/internal/notification/enqueuer_test.go create mode 100644 backend/internal/notification/payload.go create mode 100644 backend/internal/notification/renderer.go create mode 100644 backend/internal/notification/renderer_test.go create mode 100644 backend/internal/storage/sqlite/gen/notifications.sql.go create mode 100644 backend/internal/storage/sqlite/migrations/0002_notifications.sql create mode 100644 backend/internal/storage/sqlite/notification_store.go create mode 100644 backend/internal/storage/sqlite/notification_store_test.go create mode 100644 backend/internal/storage/sqlite/pr_facts.go create mode 100644 backend/internal/storage/sqlite/queries/notifications.sql diff --git a/backend/internal/cdc/event.go b/backend/internal/cdc/event.go index 04f52648..5d37f47e 100644 --- a/backend/internal/cdc/event.go +++ b/backend/internal/cdc/event.go @@ -18,11 +18,13 @@ import ( type EventType string const ( - EventSessionCreated EventType = "session_created" - EventSessionUpdated EventType = "session_updated" - EventPRCreated EventType = "pr_created" - EventPRUpdated EventType = "pr_updated" - EventPRCheckRecorded EventType = "pr_check_recorded" + EventSessionCreated EventType = "session_created" + EventSessionUpdated EventType = "session_updated" + EventPRCreated EventType = "pr_created" + EventPRUpdated EventType = "pr_updated" + EventPRCheckRecorded EventType = "pr_check_recorded" + EventNotificationCreated EventType = "notification_created" + EventNotificationUpdated EventType = "notification_updated" ) // Event is one CDC change read from change_log. Seq is the monotonic ordering + diff --git a/backend/internal/domain/notification.go b/backend/internal/domain/notification.go new file mode 100644 index 00000000..8c64c9bc --- /dev/null +++ b/backend/internal/domain/notification.go @@ -0,0 +1,44 @@ +package domain + +import ( + "encoding/json" + "time" +) + +// NotificationID is the stable public identifier for a persisted notification. +type NotificationID string + +// Notification is the provider-neutral durable notification read model. It is +// sink-agnostic: desktop, dashboard, Slack, webhooks, etc. all consume the same +// semantic payload and actions later. +type Notification struct { + Seq int64 + ID NotificationID + ProjectID ProjectID + SessionID SessionID + Source string + EventType string + SemanticType string + Priority string + Message string + Payload json.RawMessage + Actions []NotificationAction + DedupeKey string + CauseKey string + ReadAt time.Time + ArchivedAt time.Time + CreatedAt time.Time + UpdatedAt time.Time +} + +// NotificationAction is a provider-neutral action descriptor. Renderers may use +// Route for app-local navigation, URL for external navigation, or Method for a +// future command/action endpoint. +type NotificationAction struct { + ID string `json:"id"` + Kind string `json:"kind"` + Label string `json:"label"` + Route string `json:"route,omitempty"` + URL string `json:"url,omitempty"` + Method string `json:"method,omitempty"` +} diff --git a/backend/internal/integration/lifecycle_sqlite_test.go b/backend/internal/integration/lifecycle_sqlite_test.go index 47745508..c353bc6d 100644 --- a/backend/internal/integration/lifecycle_sqlite_test.go +++ b/backend/internal/integration/lifecycle_sqlite_test.go @@ -7,6 +7,8 @@ package integration import ( "context" + "io" + "log/slog" "path/filepath" "strings" "sync" @@ -16,6 +18,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/cdc" "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" + "github.com/aoagents/agent-orchestrator/backend/internal/notification" "github.com/aoagents/agent-orchestrator/backend/internal/ports" "github.com/aoagents/agent-orchestrator/backend/internal/session" "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" @@ -268,6 +271,34 @@ func seedProject(t *testing.T, store *sqlite.Store, id string) { } } +func durableLifecycle(store *sqlite.Store, messenger ports.AgentMessenger) *lifecycle.Manager { + adapter := storeAdapter{store} + renderer := notification.NewRenderer(store) + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + notifier := notification.NewEnqueuer(store, renderer, logger) + return lifecycle.New(adapter, adapter, notifier, messenger) +} + +func durableRecord(project, issue, branch string) domain.SessionRecord { + now := time.Now().UTC().Truncate(time.Second) + return domain.SessionRecord{ + ProjectID: domain.ProjectID(project), + IssueID: domain.IssueID(issue), + Kind: domain.KindWorker, + Lifecycle: domain.CanonicalSessionLifecycle{ + Version: domain.LifecycleVersion, + Session: domain.SessionSubstate{State: domain.SessionWorking}, + IsAlive: true, + Activity: domain.ActivitySubstate{ + State: domain.ActivityActive, LastActivityAt: now, Source: domain.SourceHook, + }, + }, + Metadata: domain.SessionMetadata{Branch: branch, WorkspacePath: "/workspace/" + branch}, + CreatedAt: now, + UpdatedAt: now, + } +} + // ---- tests ---- // TestHappyPath drives Spawn -> SCM PR observation (open + CI passing) -> Kill, @@ -653,6 +684,114 @@ func TestCDCPollerReceivesAllStages(t *testing.T) { } } +func TestLifecycleDurableNotification_NeedsInput(t *testing.T) { + t.Parallel() + ctx := context.Background() + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + defer store.Close() + seedProject(t, store, "mer") + rec, err := store.CreateSession(ctx, durableRecord("mer", "MER-1", "feat/input")) + if err != nil { + t.Fatalf("create session: %v", err) + } + lcm := durableLifecycle(store, &captureMessenger{}) + startSeq, _ := store.MaxChangeLogSeq(ctx) + + if err := lcm.ApplyActivitySignal(ctx, rec.ID, ports.ActivitySignal{ + Valid: true, State: domain.ActivityWaitingInput, Source: domain.SourceHook, Timestamp: time.Now(), + }); err != nil { + t.Fatalf("activity: %v", err) + } + + notifications, err := store.ListNotifications(ctx, sqlite.NotificationFilter{SessionID: string(rec.ID), Limit: 10}) + if err != nil { + t.Fatalf("list notifications: %v", err) + } + if len(notifications) != 1 || notifications[0].SemanticType != "session.needs_input" || notifications[0].DedupeKey == "" { + t.Fatalf("needs_input notification missing: %+v", notifications) + } + assertNotificationCreatedCDC(t, store, startSeq) +} + +func TestLifecycleDurableNotification_ApprovedAndGreen(t *testing.T) { + t.Parallel() + ctx := context.Background() + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + defer store.Close() + seedProject(t, store, "mer") + rec, err := store.CreateSession(ctx, durableRecord("mer", "MER-2", "feat/green")) + if err != nil { + t.Fatalf("create session: %v", err) + } + lcm := durableLifecycle(store, &captureMessenger{}) + + if err := lcm.ApplyPRObservation(ctx, rec.ID, ports.PRObservation{ + Fetched: true, URL: "https://github.com/org/repo/pull/2", Number: 2, + CI: domain.CIPassing, Review: domain.ReviewApproved, Mergeability: domain.MergeMergeable, + }); err != nil { + t.Fatalf("apply pr: %v", err) + } + notifications, err := store.ListNotifications(ctx, sqlite.NotificationFilter{SessionID: string(rec.ID), Limit: 10}) + if err != nil { + t.Fatalf("list notifications: %v", err) + } + if len(notifications) != 1 || notifications[0].SemanticType != "merge.ready" { + t.Fatalf("approved-and-green notification missing: %+v", notifications) + } +} + +func TestLifecycleDurableNotification_PRMerged(t *testing.T) { + t.Parallel() + ctx := context.Background() + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + defer store.Close() + seedProject(t, store, "mer") + rec, err := store.CreateSession(ctx, durableRecord("mer", "MER-3", "feat/merge")) + if err != nil { + t.Fatalf("create session: %v", err) + } + lcm := durableLifecycle(store, &captureMessenger{}) + startSeq, _ := store.MaxChangeLogSeq(ctx) + + if err := lcm.ApplyPRObservation(ctx, rec.ID, ports.PRObservation{ + Fetched: true, URL: "https://github.com/org/repo/pull/3", Number: 3, Merged: true, + CI: domain.CIPassing, Review: domain.ReviewApproved, Mergeability: domain.MergeMergeable, + }); err != nil { + t.Fatalf("apply pr: %v", err) + } + notifications, err := store.ListNotifications(ctx, sqlite.NotificationFilter{SessionID: string(rec.ID), Limit: 10}) + if err != nil { + t.Fatalf("list notifications: %v", err) + } + if len(notifications) != 1 || notifications[0].SemanticType != "pr.merged" { + t.Fatalf("pr_merged notification missing: %+v", notifications) + } + assertNotificationCreatedCDC(t, store, startSeq) +} + +func assertNotificationCreatedCDC(t *testing.T, store *sqlite.Store, after int64) { + t.Helper() + evs, err := store.ReadChangeLogAfter(context.Background(), after, 20) + if err != nil { + t.Fatalf("read change_log: %v", err) + } + for _, e := range evs { + if e.EventType == string(cdc.EventNotificationCreated) { + return + } + } + t.Fatalf("missing notification_created CDC after %d: %+v", after, evs) +} + // ---- small helpers ---- type pollerSource struct{ *sqlite.Store } diff --git a/backend/internal/lifecycle/manager.go b/backend/internal/lifecycle/manager.go index f61d38b4..438a76c6 100644 --- a/backend/internal/lifecycle/manager.go +++ b/backend/internal/lifecycle/manager.go @@ -168,7 +168,7 @@ func (m *Manager) ApplyPRObservation(ctx context.Context, id domain.SessionID, o } if changed { m.clearReactions(id) - return m.fireNotify(ctx, id, rec.ProjectID, reactions[rxMerged]) + return m.fireNotify(ctx, id, rec.ProjectID, rxMerged, reactions[rxMerged]) } return nil } diff --git a/backend/internal/lifecycle/reactions.go b/backend/internal/lifecycle/reactions.go index 94f149f4..44419aa6 100644 --- a/backend/internal/lifecycle/reactions.go +++ b/backend/internal/lifecycle/reactions.go @@ -208,7 +208,7 @@ func (m *Manager) dispatch(ctx context.Context, id domain.SessionID, project dom if cfg.toAgent { return m.fireAgentEntry(ctx, id, project, key, cfg) } - return m.fireNotify(ctx, id, project, cfg) + return m.fireNotify(ctx, id, project, key, cfg) } // reactionFor maps (session state, PR facts) to the reaction to enter. CI failure @@ -312,7 +312,11 @@ func (m *Manager) fireFeedback(ctx context.Context, id domain.SessionID, project m.react.mu.Lock() t.escalated = true m.react.mu.Unlock() - return m.escalate(ctx, id, pid, key) + cause := "max_attempts" + if key == rxCIFailed { + cause = "max_retries" + } + return m.escalate(ctx, id, pid, key, ports.EscalationEvent{Attempts: attempts, Cause: cause}) } return m.messenger.Send(ctx, id, message) } @@ -339,18 +343,28 @@ func (m *Manager) fireAgentEntry(ctx context.Context, id domain.SessionID, proje return m.messenger.Send(ctx, id, cfg.message) } -func (m *Manager) fireNotify(ctx context.Context, id domain.SessionID, project domain.ProjectID, cfg reactionConfig) error { +func (m *Manager) fireNotify(ctx context.Context, id domain.SessionID, project domain.ProjectID, key reactionKey, cfg reactionConfig) error { return m.notifier.Notify(ctx, ports.Event{ Type: cfg.eventType, Priority: cfg.priority, SessionID: id, ProjectID: project, Message: cfg.message, + Reaction: &ports.ReactionEvent{Key: string(key), Action: "notify"}, + CauseKey: string(key), + OccurredAt: m.clock(), }) } -func (m *Manager) escalate(ctx context.Context, id domain.SessionID, project domain.ProjectID, key reactionKey) error { +func (m *Manager) escalate(ctx context.Context, id domain.SessionID, project domain.ProjectID, key reactionKey, esc ports.EscalationEvent) error { + if esc.Cause == "" { + esc.Cause = "max_attempts" + } return m.notifier.Notify(ctx, ports.Event{ Type: "reaction.escalated", Priority: ports.PriorityUrgent, SessionID: id, ProjectID: project, - Message: fmt.Sprintf("Automatic handling of %q is exhausted — needs a human.", key), + Message: fmt.Sprintf("Automatic handling of %q is exhausted — needs a human.", key), + Reaction: &ports.ReactionEvent{Key: string(key), Action: "escalated"}, + Escalation: &esc, + CauseKey: string(key) + ":" + esc.Cause, + OccurredAt: m.clock(), }) } @@ -358,9 +372,11 @@ func (m *Manager) escalate(ctx context.Context, id domain.SessionID, project dom // cannot wake itself for. The reaper calls it on a timer. func (m *Manager) TickEscalations(ctx context.Context, now time.Time) error { type due struct { - id domain.SessionID - project domain.ProjectID - key reactionKey + id domain.SessionID + project domain.ProjectID + key reactionKey + attempts int + durationMs int64 } var fire []due m.react.mu.Lock() @@ -371,13 +387,13 @@ func (m *Manager) TickEscalations(ctx context.Context, now time.Time) error { cfg := reactions[k.key] if cfg.escalateAfter > 0 && !t.firstAt.IsZero() && now.Sub(t.firstAt) >= cfg.escalateAfter { t.escalated = true - fire = append(fire, due{k.id, t.projectID, k.key}) + fire = append(fire, due{k.id, t.projectID, k.key, t.attempts, now.Sub(t.firstAt).Milliseconds()}) } } m.react.mu.Unlock() for _, d := range fire { - if err := m.escalate(ctx, d.id, d.project, d.key); err != nil { + if err := m.escalate(ctx, d.id, d.project, d.key, ports.EscalationEvent{Attempts: d.attempts, Cause: "max_duration", DurationMs: d.durationMs}); err != nil { return err } } diff --git a/backend/internal/notification/dedupe.go b/backend/internal/notification/dedupe.go new file mode 100644 index 00000000..a4eaf326 --- /dev/null +++ b/backend/internal/notification/dedupe.go @@ -0,0 +1,74 @@ +package notification + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// ConditionHash returns a deterministic, compact hash over a condition vector. +func ConditionHash(parts ...string) string { + b, _ := json.Marshal(parts) + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:16]) +} + +// DedupeKey returns the stable durable notification idempotency key. +func DedupeKey(projectID domain.ProjectID, sessionID domain.SessionID, reactionKey, conditionHash string) string { + return fmt.Sprintf("v1:lifecycle:%s:%s:%s:%s", projectID, sessionID, reactionKey, conditionHash) +} + +// ComputeDedupeKey derives a restart-safe dedupe key from the lifecycle event +// plus current persisted state. It avoids PR updated_at because re-polling the +// same facts after daemon restart would otherwise create duplicate notifications. +func ComputeDedupeKey(event ports.Event, rec domain.SessionRecord, pr domain.PRFacts) string { + projectID := event.ProjectID + if projectID == "" { + projectID = rec.ProjectID + } + reactionKey := reactionKeyForEvent(event) + condition := []string{ + "session_state", string(rec.Lifecycle.Session.State), + "termination", string(rec.Lifecycle.TerminationReason), + "session_updated", timeKey(rec.UpdatedAt), + } + if pr.Exists { + condition = append(condition, + "pr_url", pr.URL, + "pr_number", fmt.Sprint(pr.Number), + "pr_draft", fmt.Sprint(pr.Draft), + "pr_merged", fmt.Sprint(pr.Merged), + "pr_closed", fmt.Sprint(pr.Closed), + "ci", string(pr.CI), + "review", string(pr.Review), + "mergeability", string(pr.Mergeability), + "review_comments", fmt.Sprint(pr.ReviewComments), + ) + } + if event.CauseKey != "" { + condition = append(condition, "cause_key", event.CauseKey) + } + if event.Escalation != nil { + condition = append(condition, "escalation_cause", event.Escalation.Cause) + } + return DedupeKey(projectID, event.SessionID, reactionKey, ConditionHash(condition...)) +} + +func reactionKeyForEvent(event ports.Event) string { + if event.Reaction != nil && event.Reaction.Key != "" { + return event.Reaction.Key + } + return reactionKeyFromType(event.Type) +} + +func timeKey(t time.Time) string { + if t.IsZero() { + return "" + } + return t.UTC().Format(time.RFC3339Nano) +} diff --git a/backend/internal/notification/dedupe_test.go b/backend/internal/notification/dedupe_test.go new file mode 100644 index 00000000..2730bc10 --- /dev/null +++ b/backend/internal/notification/dedupe_test.go @@ -0,0 +1,63 @@ +package notification + +import ( + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +func TestDedupeSameReactionConditionProducesSameKey(t *testing.T) { + rec := dedupeRecord("working", time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC)) + e := ports.Event{SessionID: "ao-1", Reaction: &ports.ReactionEvent{Key: "agent-needs-input", Action: "notify"}} + + k1 := ComputeDedupeKey(e, rec, domain.PRFacts{}) + k2 := ComputeDedupeKey(e, rec, domain.PRFacts{}) + if k1 != k2 { + t.Fatalf("dedupe key unstable: %q != %q", k1, k2) + } +} + +func TestDedupeChangedConditionProducesNewKey(t *testing.T) { + e := ports.Event{SessionID: "ao-1", Reaction: &ports.ReactionEvent{Key: "agent-needs-input", Action: "notify"}} + r1 := dedupeRecord("needs_input", time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC)) + r2 := dedupeRecord("needs_input", time.Date(2026, 1, 2, 3, 4, 6, 0, time.UTC)) + + if ComputeDedupeKey(e, r1, domain.PRFacts{}) == ComputeDedupeKey(e, r2, domain.PRFacts{}) { + t.Fatal("changed session updated timestamp should change dedupe key") + } +} + +func TestDedupeEscalationIncludesCauseAndDoesNotCollideWithBase(t *testing.T) { + rec := dedupeRecord("working", time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC)) + base := ports.Event{SessionID: "ao-1", Reaction: &ports.ReactionEvent{Key: "ci-failed", Action: "notify"}} + esc := ports.Event{ + SessionID: "ao-1", + Reaction: &ports.ReactionEvent{Key: "ci-failed", Action: "escalated"}, + Escalation: &ports.EscalationEvent{Attempts: 3, Cause: "max_retries"}, + } + otherCause := esc + otherCause.Escalation = &ports.EscalationEvent{Attempts: 3, Cause: "max_duration"} + + baseKey := ComputeDedupeKey(base, rec, domain.PRFacts{Exists: true, URL: "pr", CI: domain.CIFailing}) + escKey := ComputeDedupeKey(esc, rec, domain.PRFacts{Exists: true, URL: "pr", CI: domain.CIFailing}) + otherKey := ComputeDedupeKey(otherCause, rec, domain.PRFacts{Exists: true, URL: "pr", CI: domain.CIFailing}) + if baseKey == escKey { + t.Fatal("escalation dedupe key should not collide with base reaction") + } + if escKey == otherKey { + t.Fatal("escalation cause should affect dedupe key") + } +} + +func dedupeRecord(state domain.SessionState, updated time.Time) domain.SessionRecord { + return domain.SessionRecord{ + ID: "ao-1", + ProjectID: "ao", + Lifecycle: domain.CanonicalSessionLifecycle{ + Session: domain.SessionSubstate{State: state}, + }, + UpdatedAt: updated, + } +} diff --git a/backend/internal/notification/enqueuer.go b/backend/internal/notification/enqueuer.go new file mode 100644 index 00000000..79e902bf --- /dev/null +++ b/backend/internal/notification/enqueuer.go @@ -0,0 +1,50 @@ +package notification + +import ( + "context" + "log/slog" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// Store is the durable write-side used by the enqueuer. *sqlite.Store satisfies +// this interface. +type Store interface { + EnqueueNotification(ctx context.Context, row domain.Notification) (domain.Notification, bool, error) +} + +// Enqueuer is a store-backed ports.Notifier. It does not deliver to external +// sinks; it renders and persists the notification for later dashboard/app sinks. +type Enqueuer struct { + store Store + renderer *Renderer + logger *slog.Logger +} + +var _ ports.Notifier = (*Enqueuer)(nil) + +func NewEnqueuer(store Store, renderer *Renderer, logger *slog.Logger) *Enqueuer { + if logger == nil { + logger = slog.Default() + } + return &Enqueuer{store: store, renderer: renderer, logger: logger} +} + +func (e *Enqueuer) Notify(ctx context.Context, event ports.Event) error { + row, err := e.renderer.Render(ctx, event) + if err != nil { + return err + } + saved, created, err := e.store.EnqueueNotification(ctx, row) + if err != nil { + return err + } + e.logger.DebugContext(ctx, "notification enqueued", + "id", saved.ID, + "session", saved.SessionID, + "semantic_type", saved.SemanticType, + "created", created, + ) + return nil +} diff --git a/backend/internal/notification/enqueuer_test.go b/backend/internal/notification/enqueuer_test.go new file mode 100644 index 00000000..1ed14461 --- /dev/null +++ b/backend/internal/notification/enqueuer_test.go @@ -0,0 +1,38 @@ +package notification + +import ( + "context" + "io" + "log/slog" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +type fakeNotificationStore struct { + row domain.Notification + created bool +} + +func (f *fakeNotificationStore) EnqueueNotification(_ context.Context, row domain.Notification) (domain.Notification, bool, error) { + f.row = row + f.created = true + return row, true, nil +} + +func TestEnqueuerRendersAndPersists(t *testing.T) { + store := &fakeNotificationStore{} + renderer := NewRenderer(fakeReader{rec: renderRecord()}) + enq := NewEnqueuer(store, renderer, slog.New(slog.NewTextHandler(io.Discard, nil))) + if err := enq.Notify(context.Background(), ports.Event{ + Type: "reaction.agent-needs-input", Priority: ports.PriorityUrgent, + ProjectID: "ao", SessionID: "ao-7", Message: "needs input", + Reaction: &ports.ReactionEvent{Key: "agent-needs-input", Action: "notify"}, + }); err != nil { + t.Fatal(err) + } + if !store.created || store.row.SemanticType != "session.needs_input" || store.row.DedupeKey == "" { + t.Fatalf("store row not rendered: created=%v row=%+v", store.created, store.row) + } +} diff --git a/backend/internal/notification/payload.go b/backend/internal/notification/payload.go new file mode 100644 index 00000000..5492c19c --- /dev/null +++ b/backend/internal/notification/payload.go @@ -0,0 +1,65 @@ +package notification + +// PayloadSchemaVersion is the durable notification payload contract version. +const PayloadSchemaVersion = 3 + +// Payload is the provider-neutral, rich notification data shape persisted in +// SQLite. It intentionally mirrors legacy AO's NotificationData V3 while only +// filling fields the Go rewrite can source today. +type Payload struct { + SchemaVersion int `json:"schemaVersion"` + SemanticType string `json:"semanticType"` + Subject SubjectPayload `json:"subject"` + Reaction *ReactionPayload `json:"reaction,omitempty"` + Escalation *EscalationPayload `json:"escalation,omitempty"` + CI *CIPayload `json:"ci,omitempty"` + Review *ReviewPayload `json:"review,omitempty"` + Merge *MergePayload `json:"merge,omitempty"` +} + +type SubjectPayload struct { + Session *SessionSubjectPayload `json:"session,omitempty"` + PR *PRSubjectPayload `json:"pr,omitempty"` + Issue *IssueSubjectPayload `json:"issue,omitempty"` + Branch string `json:"branch,omitempty"` +} + +type SessionSubjectPayload struct { + ID string `json:"id"` + ProjectID string `json:"projectId"` +} + +type PRSubjectPayload struct { + Number int `json:"number,omitempty"` + URL string `json:"url,omitempty"` + Draft bool `json:"draft,omitempty"` +} + +type IssueSubjectPayload struct { + ID string `json:"id,omitempty"` +} + +type ReactionPayload struct { + Key string `json:"key"` + Action string `json:"action"` +} + +type EscalationPayload struct { + Attempts int `json:"attempts"` + Cause string `json:"cause"` + DurationMs int64 `json:"durationMs"` +} + +type CIPayload struct { + Status string `json:"status"` +} + +type ReviewPayload struct { + Decision string `json:"decision"` +} + +type MergePayload struct { + Ready *bool `json:"ready,omitempty"` + Conflicts *bool `json:"conflicts,omitempty"` + IsBehind *bool `json:"isBehind,omitempty"` +} diff --git a/backend/internal/notification/renderer.go b/backend/internal/notification/renderer.go new file mode 100644 index 00000000..21d41e37 --- /dev/null +++ b/backend/internal/notification/renderer.go @@ -0,0 +1,198 @@ +package notification + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// Reader is the subset of durable state the renderer rehydrates. *sqlite.Store +// satisfies it directly. +type Reader interface { + GetSession(ctx context.Context, id domain.SessionID) (domain.SessionRecord, bool, error) + PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) +} + +// Renderer converts lifecycle notification events into durable notification rows. +type Renderer struct { + reader Reader + clock func() time.Time +} + +func NewRenderer(reader Reader) *Renderer { + return &Renderer{reader: reader, clock: time.Now} +} + +func (r *Renderer) Render(ctx context.Context, event ports.Event) (domain.Notification, error) { + if event.SessionID == "" { + return domain.Notification{}, fmt.Errorf("render notification: missing session id") + } + rec, ok, err := r.reader.GetSession(ctx, event.SessionID) + if err != nil { + return domain.Notification{}, fmt.Errorf("render notification: get session %s: %w", event.SessionID, err) + } + if !ok { + return domain.Notification{}, fmt.Errorf("render notification: session %s not found", event.SessionID) + } + pr, err := r.reader.PRFactsForSession(ctx, event.SessionID) + if err != nil { + return domain.Notification{}, fmt.Errorf("render notification: pr facts for %s: %w", event.SessionID, err) + } + + projectID := event.ProjectID + if projectID == "" { + projectID = rec.ProjectID + } + reaction := reactionPayload(event) + semanticType := SemanticTypeForReaction(reaction.Key) + if semanticType == "" { + semanticType = event.Type + } + payload := Payload{ + SchemaVersion: PayloadSchemaVersion, + SemanticType: semanticType, + Subject: SubjectPayload{ + Session: &SessionSubjectPayload{ID: string(event.SessionID), ProjectID: string(projectID)}, + Branch: rec.Metadata.Branch, + }, + Reaction: &reaction, + } + if rec.IssueID != "" { + payload.Subject.Issue = &IssueSubjectPayload{ID: string(rec.IssueID)} + } + if pr.Exists { + payload.Subject.PR = &PRSubjectPayload{Number: pr.Number, URL: pr.URL, Draft: pr.Draft} + if pr.CI != "" { + payload.CI = &CIPayload{Status: string(pr.CI)} + } + if pr.Review != "" { + payload.Review = &ReviewPayload{Decision: string(pr.Review)} + } + payload.Merge = mergePayload(pr.Mergeability) + } + if event.Escalation != nil { + payload.Escalation = &EscalationPayload{ + Attempts: event.Escalation.Attempts, + Cause: event.Escalation.Cause, + DurationMs: event.Escalation.DurationMs, + } + } + + payloadJSON, err := json.Marshal(payload) + if err != nil { + return domain.Notification{}, fmt.Errorf("render notification payload: %w", err) + } + + occurredAt := event.OccurredAt + if occurredAt.IsZero() { + occurredAt = r.clock().UTC() + } + priority := string(event.Priority) + if priority == "" { + priority = string(ports.PriorityInfo) + } + dedupeKey := event.DedupeKey + if dedupeKey == "" { + dedupeKey = ComputeDedupeKey(event, rec, pr) + } + causeKey := event.CauseKey + if causeKey == "" { + causeKey = reaction.Key + if event.Escalation != nil && event.Escalation.Cause != "" { + causeKey += ":" + event.Escalation.Cause + } + } + + return domain.Notification{ + ProjectID: projectID, + SessionID: event.SessionID, + Source: "lifecycle", + EventType: event.Type, + SemanticType: semanticType, + Priority: priority, + Message: event.Message, + Payload: payloadJSON, + Actions: actionsFor(projectID, event.SessionID, pr), + DedupeKey: dedupeKey, + CauseKey: causeKey, + CreatedAt: occurredAt, + UpdatedAt: occurredAt, + }, nil +} + +func reactionPayload(event ports.Event) ReactionPayload { + key := reactionKeyFromType(event.Type) + action := "notify" + if event.Reaction != nil { + if event.Reaction.Key != "" { + key = event.Reaction.Key + } + if event.Reaction.Action != "" { + action = event.Reaction.Action + } + } + if event.Escalation != nil && event.Reaction == nil { + action = "escalated" + } + return ReactionPayload{Key: key, Action: action} +} + +func reactionKeyFromType(t string) string { + if strings.HasPrefix(t, "reaction.") { + return strings.TrimPrefix(t, "reaction.") + } + return t +} + +func mergePayload(m domain.Mergeability) *MergePayload { + if m == "" { + return nil + } + ready := m == domain.MergeMergeable + conflicts := m == domain.MergeConflicting + return &MergePayload{Ready: &ready, Conflicts: &conflicts} +} + +func actionsFor(projectID domain.ProjectID, sessionID domain.SessionID, pr domain.PRFacts) []domain.NotificationAction { + actions := []domain.NotificationAction{{ + ID: "open-session", + Kind: "route", + Label: "Open session", + Route: fmt.Sprintf("/projects/%s/sessions/%s", projectID, sessionID), + }} + if pr.Exists && pr.URL != "" { + actions = append(actions, domain.NotificationAction{ID: "open-pr", Kind: "url", Label: "Open PR", URL: pr.URL}) + } + return actions +} + +// SemanticTypeForReaction maps internal reaction keys to public semantic types. +func SemanticTypeForReaction(key string) string { + switch key { + case "approved-and-green": + return "merge.ready" + case "agent-stuck": + return "session.stuck" + case "agent-needs-input": + return "session.needs_input" + case "agent-exited": + return "session.exited" + case "pr-closed": + return "pr.closed" + case "pr-merged": + return "pr.merged" + case "ci-failed": + return "ci.failing" + case "review-comments": + return "review.changes_requested" + case "merge-conflicts": + return "merge.conflicts" + default: + return "" + } +} diff --git a/backend/internal/notification/renderer_test.go b/backend/internal/notification/renderer_test.go new file mode 100644 index 00000000..4cf70c97 --- /dev/null +++ b/backend/internal/notification/renderer_test.go @@ -0,0 +1,133 @@ +package notification + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +type fakeReader struct { + rec domain.SessionRecord + pr domain.PRFacts +} + +func (f fakeReader) GetSession(context.Context, domain.SessionID) (domain.SessionRecord, bool, error) { + return f.rec, true, nil +} +func (f fakeReader) PRFactsForSession(context.Context, domain.SessionID) (domain.PRFacts, error) { + return f.pr, nil +} + +func TestSemanticTypeMapping(t *testing.T) { + cases := map[string]string{ + "approved-and-green": "merge.ready", + "agent-stuck": "session.stuck", + "agent-needs-input": "session.needs_input", + "agent-exited": "session.exited", + "pr-closed": "pr.closed", + "pr-merged": "pr.merged", + "ci-failed": "ci.failing", + "review-comments": "review.changes_requested", + "merge-conflicts": "merge.conflicts", + } + for key, want := range cases { + if got := SemanticTypeForReaction(key); got != want { + t.Fatalf("SemanticTypeForReaction(%q) = %q, want %q", key, got, want) + } + } +} + +func TestRendererPayloadIncludesSessionProjectIssueAndBranch(t *testing.T) { + r := NewRenderer(fakeReader{rec: renderRecord()}) + row, err := r.Render(context.Background(), ports.Event{ + Type: "reaction.agent-needs-input", Priority: ports.PriorityUrgent, + ProjectID: "ao", SessionID: "ao-7", Message: "needs input", + Reaction: &ports.ReactionEvent{Key: "agent-needs-input", Action: "notify"}, + OccurredAt: time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC), + }) + if err != nil { + t.Fatal(err) + } + var p Payload + if err := json.Unmarshal(row.Payload, &p); err != nil { + t.Fatal(err) + } + if p.SchemaVersion != 3 || p.SemanticType != "session.needs_input" { + t.Fatalf("payload header = %+v", p) + } + if p.Subject.Session == nil || p.Subject.Session.ID != "ao-7" || p.Subject.Session.ProjectID != "ao" { + t.Fatalf("session subject missing: %+v", p.Subject.Session) + } + if p.Subject.Issue == nil || p.Subject.Issue.ID != "AO-12" || p.Subject.Branch != "feat/example" { + t.Fatalf("issue/branch missing: %+v", p.Subject) + } +} + +func TestRendererPRPayloadIncludesFacts(t *testing.T) { + r := NewRenderer(fakeReader{rec: renderRecord(), pr: domain.PRFacts{ + Exists: true, URL: "https://github.com/org/repo/pull/12", Number: 12, + CI: domain.CIFailing, Review: domain.ReviewChangesRequest, Mergeability: domain.MergeConflicting, + }}) + row, err := r.Render(context.Background(), ports.Event{ + Type: "reaction.review-comments", Priority: ports.PriorityAction, + ProjectID: "ao", SessionID: "ao-7", Message: "review", + Reaction: &ports.ReactionEvent{Key: "review-comments", Action: "notify"}, + }) + if err != nil { + t.Fatal(err) + } + var p Payload + if err := json.Unmarshal(row.Payload, &p); err != nil { + t.Fatal(err) + } + if p.Subject.PR == nil || p.Subject.PR.URL != "https://github.com/org/repo/pull/12" || p.Subject.PR.Number != 12 { + t.Fatalf("pr subject missing: %+v", p.Subject.PR) + } + if p.CI == nil || p.CI.Status != "failing" { + t.Fatalf("ci missing: %+v", p.CI) + } + if p.Review == nil || p.Review.Decision != "changes_requested" { + t.Fatalf("review missing: %+v", p.Review) + } + if p.Merge == nil || p.Merge.Conflicts == nil || *p.Merge.Conflicts != true || p.Merge.Ready == nil || *p.Merge.Ready != false { + t.Fatalf("merge missing: %+v", p.Merge) + } +} + +func TestRendererEscalationPayloadIncludesDetails(t *testing.T) { + r := NewRenderer(fakeReader{rec: renderRecord()}) + row, err := r.Render(context.Background(), ports.Event{ + Type: "reaction.escalated", Priority: ports.PriorityUrgent, + ProjectID: "ao", SessionID: "ao-7", Message: "escalated", + Reaction: &ports.ReactionEvent{Key: "ci-failed", Action: "escalated"}, + Escalation: &ports.EscalationEvent{Attempts: 3, Cause: "max_retries", DurationMs: 42}, + }) + if err != nil { + t.Fatal(err) + } + var p Payload + if err := json.Unmarshal(row.Payload, &p); err != nil { + t.Fatal(err) + } + if p.Reaction == nil || p.Reaction.Key != "ci-failed" || p.Reaction.Action != "escalated" { + t.Fatalf("reaction missing: %+v", p.Reaction) + } + if p.Escalation == nil || p.Escalation.Attempts != 3 || p.Escalation.Cause != "max_retries" || p.Escalation.DurationMs != 42 { + t.Fatalf("escalation missing: %+v", p.Escalation) + } +} + +func renderRecord() domain.SessionRecord { + return domain.SessionRecord{ + ID: "ao-7", + ProjectID: "ao", + IssueID: "AO-12", + Lifecycle: domain.CanonicalSessionLifecycle{Session: domain.SessionSubstate{State: domain.SessionNeedsInput}}, + Metadata: domain.SessionMetadata{Branch: "feat/example"}, + UpdatedAt: time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC), + } +} diff --git a/backend/internal/ports/outbound.go b/backend/internal/ports/outbound.go index 75a24bf0..79c20423 100644 --- a/backend/internal/ports/outbound.go +++ b/backend/internal/ports/outbound.go @@ -2,6 +2,7 @@ package ports import ( "context" + "time" "github.com/aoagents/agent-orchestrator/backend/internal/domain" ) @@ -46,18 +47,37 @@ type AgentMessenger interface { type Priority string const ( - PriorityUrgent Priority = "urgent" - PriorityAction Priority = "action" - PriorityInfo Priority = "info" + PriorityUrgent Priority = "urgent" + PriorityAction Priority = "action" + PriorityWarning Priority = "warning" + PriorityInfo Priority = "info" ) -// Event is a human-facing notification produced by a reaction. +// Event is a human-facing notification produced by a reaction. It carries the +// stable reaction/escalation context a durable notification renderer needs, +// while lifecycle remains responsible for deciding what should notify. type Event struct { - Type string - Priority Priority - SessionID domain.SessionID - ProjectID domain.ProjectID - Message string + Type string + Priority Priority + SessionID domain.SessionID + ProjectID domain.ProjectID + Message string + Reaction *ReactionEvent + Escalation *EscalationEvent + DedupeKey string + CauseKey string + OccurredAt time.Time +} + +type ReactionEvent struct { + Key string // agent-needs-input, approved-and-green, ci-failed, etc. + Action string // notify | escalated +} + +type EscalationEvent struct { + Attempts int + Cause string // max_retries | max_attempts | max_duration + DurationMs int64 } // ---- runtime / agent / workspace plugin ports (used by the Session Manager) ---- diff --git a/backend/internal/storage/sqlite/db.go b/backend/internal/storage/sqlite/db.go index 926d08d3..7f8535bf 100644 --- a/backend/internal/storage/sqlite/db.go +++ b/backend/internal/storage/sqlite/db.go @@ -1,5 +1,5 @@ -// Package sqlite is the durable persistence adapter: the 6-table schema (goose -// migrations), typed CRUD over sqlc-generated queries, and the read side of the +// Package sqlite is the durable persistence adapter: the goose-managed schema, +// typed CRUD over sqlc-generated queries, and the read side of the // trigger-driven CDC (it reads change_log; the DB triggers write it). package sqlite diff --git a/backend/internal/storage/sqlite/gen/models.go b/backend/internal/storage/sqlite/gen/models.go index 0c5b5c91..992c0ca0 100644 --- a/backend/internal/storage/sqlite/gen/models.go +++ b/backend/internal/storage/sqlite/gen/models.go @@ -18,6 +18,26 @@ type ChangeLog struct { CreatedAt time.Time } +type Notification struct { + Seq int64 + ID string + ProjectID string + SessionID string + Source string + EventType string + SemanticType string + Priority string + Message string + PayloadJson string + ActionsJson string + DedupeKey string + CauseKey string + ReadAt sql.NullTime + ArchivedAt sql.NullTime + CreatedAt time.Time + UpdatedAt time.Time +} + type Pr struct { Url string SessionID string diff --git a/backend/internal/storage/sqlite/gen/notifications.sql.go b/backend/internal/storage/sqlite/gen/notifications.sql.go new file mode 100644 index 00000000..7b2b5493 --- /dev/null +++ b/backend/internal/storage/sqlite/gen/notifications.sql.go @@ -0,0 +1,464 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 +// source: notifications.sql + +package gen + +import ( + "context" + "database/sql" + "time" +) + +const archiveNotification = `-- name: ArchiveNotification :one +UPDATE notifications +SET archived_at = ?, updated_at = ? +WHERE id = ? AND archived_at IS NULL +RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +` + +type ArchiveNotificationParams struct { + ArchivedAt sql.NullTime + UpdatedAt time.Time + ID string +} + +func (q *Queries) ArchiveNotification(ctx context.Context, arg ArchiveNotificationParams) (Notification, error) { + row := q.db.QueryRowContext(ctx, archiveNotification, arg.ArchivedAt, arg.UpdatedAt, arg.ID) + var i Notification + err := row.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const getNotification = `-- name: GetNotification :one +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications WHERE id = ? +` + +func (q *Queries) GetNotification(ctx context.Context, id string) (Notification, error) { + row := q.db.QueryRowContext(ctx, getNotification, id) + var i Notification + err := row.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const getNotificationByDedupeKey = `-- name: GetNotificationByDedupeKey :one +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications WHERE dedupe_key = ? +` + +func (q *Queries) GetNotificationByDedupeKey(ctx context.Context, dedupeKey string) (Notification, error) { + row := q.db.QueryRowContext(ctx, getNotificationByDedupeKey, dedupeKey) + var i Notification + err := row.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const insertNotification = `-- name: InsertNotification :one +INSERT INTO notifications ( + project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, created_at, updated_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (dedupe_key) DO NOTHING +RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +` + +type InsertNotificationParams struct { + ProjectID string + SessionID string + Source string + EventType string + SemanticType string + Priority string + Message string + PayloadJson string + ActionsJson string + DedupeKey string + CauseKey string + CreatedAt time.Time + UpdatedAt time.Time +} + +func (q *Queries) InsertNotification(ctx context.Context, arg InsertNotificationParams) (Notification, error) { + row := q.db.QueryRowContext(ctx, insertNotification, + arg.ProjectID, + arg.SessionID, + arg.Source, + arg.EventType, + arg.SemanticType, + arg.Priority, + arg.Message, + arg.PayloadJson, + arg.ActionsJson, + arg.DedupeKey, + arg.CauseKey, + arg.CreatedAt, + arg.UpdatedAt, + ) + var i Notification + err := row.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const listNotifications = `-- name: ListNotifications :many +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications +ORDER BY seq DESC +LIMIT ? +` + +func (q *Queries) ListNotifications(ctx context.Context, limit int64) ([]Notification, error) { + rows, err := q.db.QueryContext(ctx, listNotifications, limit) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Notification{} + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listNotificationsByProject = `-- name: ListNotificationsByProject :many +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications +WHERE project_id = ? +ORDER BY seq DESC +LIMIT ? +` + +type ListNotificationsByProjectParams struct { + ProjectID string + Limit int64 +} + +func (q *Queries) ListNotificationsByProject(ctx context.Context, arg ListNotificationsByProjectParams) ([]Notification, error) { + rows, err := q.db.QueryContext(ctx, listNotificationsByProject, arg.ProjectID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Notification{} + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listNotificationsBySession = `-- name: ListNotificationsBySession :many +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications +WHERE session_id = ? +ORDER BY seq DESC +LIMIT ? +` + +type ListNotificationsBySessionParams struct { + SessionID string + Limit int64 +} + +func (q *Queries) ListNotificationsBySession(ctx context.Context, arg ListNotificationsBySessionParams) ([]Notification, error) { + rows, err := q.db.QueryContext(ctx, listNotificationsBySession, arg.SessionID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Notification{} + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listUnreadNotifications = `-- name: ListUnreadNotifications :many +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications +WHERE read_at IS NULL AND archived_at IS NULL +ORDER BY seq DESC +LIMIT ? +` + +func (q *Queries) ListUnreadNotifications(ctx context.Context, limit int64) ([]Notification, error) { + rows, err := q.db.QueryContext(ctx, listUnreadNotifications, limit) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Notification{} + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const markNotificationRead = `-- name: MarkNotificationRead :one +UPDATE notifications +SET read_at = ?, updated_at = ? +WHERE id = ? AND read_at IS NULL +RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +` + +type MarkNotificationReadParams struct { + ReadAt sql.NullTime + UpdatedAt time.Time + ID string +} + +func (q *Queries) MarkNotificationRead(ctx context.Context, arg MarkNotificationReadParams) (Notification, error) { + row := q.db.QueryRowContext(ctx, markNotificationRead, arg.ReadAt, arg.UpdatedAt, arg.ID) + var i Notification + err := row.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const markNotificationUnread = `-- name: MarkNotificationUnread :one +UPDATE notifications +SET read_at = NULL, updated_at = ? +WHERE id = ? AND read_at IS NOT NULL +RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +` + +type MarkNotificationUnreadParams struct { + UpdatedAt time.Time + ID string +} + +func (q *Queries) MarkNotificationUnread(ctx context.Context, arg MarkNotificationUnreadParams) (Notification, error) { + row := q.db.QueryRowContext(ctx, markNotificationUnread, arg.UpdatedAt, arg.ID) + var i Notification + err := row.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} diff --git a/backend/internal/storage/sqlite/gen/querier.go b/backend/internal/storage/sqlite/gen/querier.go index 365113b1..4f91a9d5 100644 --- a/backend/internal/storage/sqlite/gen/querier.go +++ b/backend/internal/storage/sqlite/gen/querier.go @@ -9,21 +9,31 @@ import ( ) type Querier interface { + ArchiveNotification(ctx context.Context, arg ArchiveNotificationParams) (Notification, error) ArchiveProject(ctx context.Context, arg ArchiveProjectParams) error DeletePR(ctx context.Context, url string) error DeletePRComments(ctx context.Context, prUrl string) error DeleteSession(ctx context.Context, id string) error + GetNotification(ctx context.Context, id string) (Notification, error) + GetNotificationByDedupeKey(ctx context.Context, dedupeKey string) (Notification, error) GetPR(ctx context.Context, url string) (Pr, error) GetProject(ctx context.Context, id string) (Project, error) GetSession(ctx context.Context, id string) (Session, error) + InsertNotification(ctx context.Context, arg InsertNotificationParams) (Notification, error) InsertSession(ctx context.Context, arg InsertSessionParams) error ListAllSessions(ctx context.Context) ([]Session, error) ListChecksByPR(ctx context.Context, prUrl string) ([]PrCheck, error) + ListNotifications(ctx context.Context, limit int64) ([]Notification, error) + ListNotificationsByProject(ctx context.Context, arg ListNotificationsByProjectParams) ([]Notification, error) + ListNotificationsBySession(ctx context.Context, arg ListNotificationsBySessionParams) ([]Notification, error) ListPRComments(ctx context.Context, prUrl string) ([]PrComment, error) ListPRsBySession(ctx context.Context, sessionID string) ([]Pr, error) ListProjects(ctx context.Context) ([]Project, error) ListRecentChecks(ctx context.Context, arg ListRecentChecksParams) ([]ListRecentChecksRow, error) ListSessionsByProject(ctx context.Context, projectID string) ([]Session, error) + ListUnreadNotifications(ctx context.Context, limit int64) ([]Notification, error) + MarkNotificationRead(ctx context.Context, arg MarkNotificationReadParams) (Notification, error) + MarkNotificationUnread(ctx context.Context, arg MarkNotificationUnreadParams) (Notification, error) MaxChangeLogSeq(ctx context.Context) (interface{}, error) NextSessionNum(ctx context.Context, projectID string) (int64, error) ReadChangeLogAfter(ctx context.Context, arg ReadChangeLogAfterParams) ([]ChangeLog, error) diff --git a/backend/internal/storage/sqlite/migrations/0002_notifications.sql b/backend/internal/storage/sqlite/migrations/0002_notifications.sql new file mode 100644 index 00000000..1ab12f5b --- /dev/null +++ b/backend/internal/storage/sqlite/migrations/0002_notifications.sql @@ -0,0 +1,81 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE notifications ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + id TEXT NOT NULL UNIQUE DEFAULT ('ntf_' || lower(hex(randomblob(16)))), + project_id TEXT NOT NULL REFERENCES projects(id), + session_id TEXT NOT NULL REFERENCES sessions(id), + source TEXT NOT NULL DEFAULT 'lifecycle' CHECK (source IN ('lifecycle')), + event_type TEXT NOT NULL, + semantic_type TEXT NOT NULL, + priority TEXT NOT NULL CHECK (priority IN ('urgent','action','warning','info')), + message TEXT NOT NULL, + payload_json TEXT NOT NULL CHECK (json_valid(payload_json)), + actions_json TEXT NOT NULL DEFAULT '[]' CHECK (json_valid(actions_json)), + dedupe_key TEXT NOT NULL UNIQUE, + cause_key TEXT NOT NULL DEFAULT '', + read_at TIMESTAMP, + archived_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), + updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX idx_notifications_project_seq ON notifications(project_id, seq DESC); +CREATE INDEX idx_notifications_session_seq ON notifications(session_id, seq DESC); +CREATE INDEX idx_notifications_unread ON notifications(seq DESC) + WHERE read_at IS NULL AND archived_at IS NULL; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER notifications_cdc_insert +AFTER INSERT ON notifications +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + NEW.project_id, + NEW.session_id, + 'notification_created', + json_object( + 'seq', NEW.seq, + 'id', NEW.id, + 'type', NEW.semantic_type, + 'priority', NEW.priority, + 'message', NEW.message, + 'data', json(NEW.payload_json), + 'actions', json(NEW.actions_json), + 'readAt', NEW.read_at, + 'archivedAt', NEW.archived_at + ), + NEW.created_at + ); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER notifications_cdc_update +AFTER UPDATE ON notifications +WHEN OLD.read_at IS NOT NEW.read_at + OR OLD.archived_at IS NOT NEW.archived_at +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + NEW.project_id, + NEW.session_id, + 'notification_updated', + json_object( + 'seq', NEW.seq, + 'id', NEW.id, + 'readAt', NEW.read_at, + 'archivedAt', NEW.archived_at + ), + NEW.updated_at + ); +END; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TRIGGER IF EXISTS notifications_cdc_update; +DROP TRIGGER IF EXISTS notifications_cdc_insert; +DROP TABLE IF EXISTS notifications; +-- +goose StatementEnd diff --git a/backend/internal/storage/sqlite/notification_store.go b/backend/internal/storage/sqlite/notification_store.go new file mode 100644 index 00000000..90b84331 --- /dev/null +++ b/backend/internal/storage/sqlite/notification_store.go @@ -0,0 +1,242 @@ +package sqlite + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/gen" +) + +// NotificationRow is the storage-facing notification row. It aliases the +// provider-neutral domain type so callers do not depend on sqlc structs. +type NotificationRow = domain.Notification + +// NotificationFilter constrains ListNotifications. A zero filter returns the +// newest notifications across projects. +type NotificationFilter struct { + ProjectID string + SessionID string + UnreadOnly bool + Limit int +} + +const defaultNotificationLimit = 100 + +// EnqueueNotification inserts a notification exactly once per dedupe key. The +// returned bool is true when a new row was created; false means the existing row +// for the same dedupe key was returned. +func (s *Store) EnqueueNotification(ctx context.Context, row NotificationRow) (NotificationRow, bool, error) { + row = normalizeNotification(row) + actionsJSON, err := json.Marshal(row.Actions) + if err != nil { + return NotificationRow{}, false, fmt.Errorf("marshal notification actions: %w", err) + } + + s.writeMu.Lock() + defer s.writeMu.Unlock() + + got, err := s.qw.InsertNotification(ctx, gen.InsertNotificationParams{ + ProjectID: string(row.ProjectID), + SessionID: string(row.SessionID), + Source: row.Source, + EventType: row.EventType, + SemanticType: row.SemanticType, + Priority: row.Priority, + Message: row.Message, + PayloadJson: string(row.Payload), + ActionsJson: string(actionsJSON), + DedupeKey: row.DedupeKey, + CauseKey: row.CauseKey, + CreatedAt: row.CreatedAt, + UpdatedAt: row.UpdatedAt, + }) + if errors.Is(err, sql.ErrNoRows) { + existing, readErr := s.qw.GetNotificationByDedupeKey(ctx, row.DedupeKey) + if readErr != nil { + return NotificationRow{}, false, fmt.Errorf("get notification by dedupe %q: %w", row.DedupeKey, readErr) + } + mapped, mapErr := notificationFromGen(existing) + return mapped, false, mapErr + } + if err != nil { + return NotificationRow{}, false, fmt.Errorf("insert notification: %w", err) + } + mapped, err := notificationFromGen(got) + return mapped, true, err +} + +// GetNotification returns one notification by id, or ok=false if absent. +func (s *Store) GetNotification(ctx context.Context, id string) (NotificationRow, bool, error) { + row, err := s.qr.GetNotification(ctx, id) + if errors.Is(err, sql.ErrNoRows) { + return NotificationRow{}, false, nil + } + if err != nil { + return NotificationRow{}, false, fmt.Errorf("get notification %s: %w", id, err) + } + mapped, err := notificationFromGen(row) + return mapped, true, err +} + +// ListNotifications returns notifications in descending seq order. +func (s *Store) ListNotifications(ctx context.Context, filter NotificationFilter) ([]NotificationRow, error) { + limit := int64(filter.Limit) + if limit <= 0 { + limit = defaultNotificationLimit + } + + var ( + rows []gen.Notification + err error + ) + switch { + case filter.UnreadOnly: + rows, err = s.qr.ListUnreadNotifications(ctx, limit) + case filter.SessionID != "": + rows, err = s.qr.ListNotificationsBySession(ctx, gen.ListNotificationsBySessionParams{SessionID: filter.SessionID, Limit: limit}) + case filter.ProjectID != "": + rows, err = s.qr.ListNotificationsByProject(ctx, gen.ListNotificationsByProjectParams{ProjectID: filter.ProjectID, Limit: limit}) + default: + rows, err = s.qr.ListNotifications(ctx, limit) + } + if err != nil { + return nil, fmt.Errorf("list notifications: %w", err) + } + return notificationsFromGen(rows) +} + +// MarkNotificationRead marks an unread notification read. The returned bool is +// true only when the row changed; repeated calls return the existing row with +// changed=false and emit no CDC update. +func (s *Store) MarkNotificationRead(ctx context.Context, id string, at time.Time) (NotificationRow, bool, error) { + if at.IsZero() { + at = time.Now().UTC() + } + s.writeMu.Lock() + defer s.writeMu.Unlock() + + row, err := s.qw.MarkNotificationRead(ctx, gen.MarkNotificationReadParams{ + ReadAt: sql.NullTime{Time: at, Valid: true}, + UpdatedAt: at, + ID: id, + }) + return s.changedNotificationResult(ctx, row, id, true, err) +} + +// MarkNotificationUnread clears read_at. Repeated calls are idempotent and emit +// no CDC update. +func (s *Store) MarkNotificationUnread(ctx context.Context, id string) (NotificationRow, bool, error) { + now := time.Now().UTC() + s.writeMu.Lock() + defer s.writeMu.Unlock() + + row, err := s.qw.MarkNotificationUnread(ctx, gen.MarkNotificationUnreadParams{UpdatedAt: now, ID: id}) + return s.changedNotificationResult(ctx, row, id, true, err) +} + +// ArchiveNotification marks a notification archived. Repeated calls are +// idempotent and emit no CDC update. +func (s *Store) ArchiveNotification(ctx context.Context, id string, at time.Time) (NotificationRow, bool, error) { + if at.IsZero() { + at = time.Now().UTC() + } + s.writeMu.Lock() + defer s.writeMu.Unlock() + + row, err := s.qw.ArchiveNotification(ctx, gen.ArchiveNotificationParams{ + ArchivedAt: sql.NullTime{Time: at, Valid: true}, + UpdatedAt: at, + ID: id, + }) + return s.changedNotificationResult(ctx, row, id, true, err) +} + +func (s *Store) changedNotificationResult(ctx context.Context, row gen.Notification, id string, changed bool, err error) (NotificationRow, bool, error) { + if errors.Is(err, sql.ErrNoRows) { + existing, readErr := s.qw.GetNotification(ctx, id) + if errors.Is(readErr, sql.ErrNoRows) { + return NotificationRow{}, false, nil + } + if readErr != nil { + return NotificationRow{}, false, fmt.Errorf("get notification %s: %w", id, readErr) + } + mapped, mapErr := notificationFromGen(existing) + return mapped, false, mapErr + } + if err != nil { + return NotificationRow{}, false, err + } + mapped, mapErr := notificationFromGen(row) + return mapped, changed, mapErr +} + +func normalizeNotification(row NotificationRow) NotificationRow { + now := time.Now().UTC() + if row.Source == "" { + row.Source = "lifecycle" + } + if len(row.Payload) == 0 { + row.Payload = json.RawMessage(`{}`) + } + if row.Actions == nil { + row.Actions = []domain.NotificationAction{} + } + if row.CreatedAt.IsZero() { + row.CreatedAt = now + } + if row.UpdatedAt.IsZero() { + row.UpdatedAt = row.CreatedAt + } + return row +} + +func notificationsFromGen(rows []gen.Notification) ([]NotificationRow, error) { + out := make([]NotificationRow, 0, len(rows)) + for _, r := range rows { + row, err := notificationFromGen(r) + if err != nil { + return nil, err + } + out = append(out, row) + } + return out, nil +} + +func notificationFromGen(r gen.Notification) (NotificationRow, error) { + var actions []domain.NotificationAction + if r.ActionsJson == "" { + r.ActionsJson = "[]" + } + if err := json.Unmarshal([]byte(r.ActionsJson), &actions); err != nil { + return NotificationRow{}, fmt.Errorf("decode notification actions %s: %w", r.ID, err) + } + row := NotificationRow{ + Seq: r.Seq, + ID: domain.NotificationID(r.ID), + ProjectID: domain.ProjectID(r.ProjectID), + SessionID: domain.SessionID(r.SessionID), + Source: r.Source, + EventType: r.EventType, + SemanticType: r.SemanticType, + Priority: r.Priority, + Message: r.Message, + Payload: append(json.RawMessage(nil), []byte(r.PayloadJson)...), + Actions: actions, + DedupeKey: r.DedupeKey, + CauseKey: r.CauseKey, + CreatedAt: r.CreatedAt, + UpdatedAt: r.UpdatedAt, + } + if r.ReadAt.Valid { + row.ReadAt = r.ReadAt.Time + } + if r.ArchivedAt.Valid { + row.ArchivedAt = r.ArchivedAt.Time + } + return row, nil +} diff --git a/backend/internal/storage/sqlite/notification_store_test.go b/backend/internal/storage/sqlite/notification_store_test.go new file mode 100644 index 00000000..cd5c44a9 --- /dev/null +++ b/backend/internal/storage/sqlite/notification_store_test.go @@ -0,0 +1,232 @@ +package sqlite + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/gen" +) + +func TestNotificationInsertListGetAndDedupe(t *testing.T) { + s, rec := newNotificationTestSession(t) + ctx := context.Background() + + row, created, err := s.EnqueueNotification(ctx, sampleNotification(rec, "dedupe-1")) + if err != nil { + t.Fatal(err) + } + if !created || row.ID == "" || row.Seq == 0 { + t.Fatalf("enqueue created=%v row=%+v", created, row) + } + got, ok, err := s.GetNotification(ctx, string(row.ID)) + if err != nil || !ok { + t.Fatalf("get ok=%v err=%v", ok, err) + } + if got.DedupeKey != "dedupe-1" || got.Actions[0].ID != "open-session" { + t.Fatalf("get mismatch: %+v", got) + } + dup, created, err := s.EnqueueNotification(ctx, sampleNotification(rec, "dedupe-1")) + if err != nil { + t.Fatal(err) + } + if created || dup.ID != row.ID || dup.Seq != row.Seq { + t.Fatalf("duplicate should return existing row created=false: created=%v dup=%+v first=%+v", created, dup, row) + } + all, err := s.ListNotifications(ctx, NotificationFilter{Limit: 10}) + if err != nil || len(all) != 1 { + t.Fatalf("list all len=%d err=%v", len(all), err) + } + byProject, _ := s.ListNotifications(ctx, NotificationFilter{ProjectID: string(rec.ProjectID), Limit: 10}) + bySession, _ := s.ListNotifications(ctx, NotificationFilter{SessionID: string(rec.ID), Limit: 10}) + if len(byProject) != 1 || len(bySession) != 1 { + t.Fatalf("project/session lists = %d/%d", len(byProject), len(bySession)) + } +} + +func TestNotificationReadUnreadArchiveAndIdempotentCDC(t *testing.T) { + s, rec := newNotificationTestSession(t) + ctx := context.Background() + row, _, err := s.EnqueueNotification(ctx, sampleNotification(rec, "dedupe-read")) + if err != nil { + t.Fatal(err) + } + createdSeq, _ := s.MaxChangeLogSeq(ctx) + + readAt := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC) + read, changed, err := s.MarkNotificationRead(ctx, string(row.ID), readAt) + if err != nil || !changed { + t.Fatalf("mark read changed=%v err=%v", changed, err) + } + if read.ReadAt.IsZero() { + t.Fatal("read_at not set") + } + afterRead, _ := s.MaxChangeLogSeq(ctx) + if afterRead != createdSeq+1 { + t.Fatalf("read should emit one CDC event: before=%d after=%d", createdSeq, afterRead) + } + _, changed, err = s.MarkNotificationRead(ctx, string(row.ID), readAt.Add(time.Second)) + if err != nil || changed { + t.Fatalf("repeated mark read should be idempotent changed=false, got changed=%v err=%v", changed, err) + } + afterRepeat, _ := s.MaxChangeLogSeq(ctx) + if afterRepeat != afterRead { + t.Fatalf("repeated read emitted CDC: before=%d after=%d", afterRead, afterRepeat) + } + + unread, changed, err := s.MarkNotificationUnread(ctx, string(row.ID)) + if err != nil || !changed || !unread.ReadAt.IsZero() { + t.Fatalf("mark unread changed=%v err=%v row=%+v", changed, err, unread) + } + unreadList, err := s.ListNotifications(ctx, NotificationFilter{UnreadOnly: true, Limit: 10}) + if err != nil || len(unreadList) != 1 { + t.Fatalf("unread list len=%d err=%v", len(unreadList), err) + } + + archiveSeq, _ := s.MaxChangeLogSeq(ctx) + archived, changed, err := s.ArchiveNotification(ctx, string(row.ID), readAt.Add(2*time.Second)) + if err != nil || !changed || archived.ArchivedAt.IsZero() { + t.Fatalf("archive changed=%v err=%v row=%+v", changed, err, archived) + } + afterArchive, _ := s.MaxChangeLogSeq(ctx) + if afterArchive != archiveSeq+1 { + t.Fatalf("archive should emit one CDC event: before=%d after=%d", archiveSeq, afterArchive) + } + _, changed, err = s.ArchiveNotification(ctx, string(row.ID), readAt.Add(3*time.Second)) + if err != nil || changed { + t.Fatalf("repeated archive should be idempotent changed=false, got changed=%v err=%v", changed, err) + } + afterArchiveRepeat, _ := s.MaxChangeLogSeq(ctx) + if afterArchiveRepeat != afterArchive { + t.Fatalf("repeated archive emitted CDC: before=%d after=%d", afterArchive, afterArchiveRepeat) + } +} + +func TestNotificationJSONConstraintsRejectInvalidPayloadAndActions(t *testing.T) { + s, rec := newNotificationTestSession(t) + ctx := context.Background() + + badPayload := sampleNotification(rec, "bad-payload") + badPayload.Payload = json.RawMessage(`{"nope"`) + if _, _, err := s.EnqueueNotification(ctx, badPayload); err == nil { + t.Fatal("invalid payload JSON should be rejected") + } + + now := time.Now().UTC().Truncate(time.Second) + _, err := s.qw.InsertNotification(ctx, gen.InsertNotificationParams{ + ProjectID: string(rec.ProjectID), + SessionID: string(rec.ID), + Source: "lifecycle", + EventType: "reaction.agent-needs-input", + SemanticType: "session.needs_input", + Priority: "urgent", + Message: "bad actions", + PayloadJson: `{}`, + ActionsJson: `{not-json`, + DedupeKey: "bad-actions", + CauseKey: "agent-needs-input", + CreatedAt: now, + UpdatedAt: now, + }) + if err == nil { + t.Fatal("invalid actions JSON should be rejected") + } +} + +func TestNotificationCDCForCreateReadArchive(t *testing.T) { + s, rec := newNotificationTestSession(t) + ctx := context.Background() + startSeq, _ := s.MaxChangeLogSeq(ctx) + row, _, err := s.EnqueueNotification(ctx, sampleNotification(rec, "dedupe-cdc")) + if err != nil { + t.Fatal(err) + } + _, _, _ = s.MarkNotificationRead(ctx, string(row.ID), time.Now().UTC()) + _, _, _ = s.ArchiveNotification(ctx, string(row.ID), time.Now().UTC()) + + evs, err := s.ReadChangeLogAfter(ctx, startSeq, 10) + if err != nil { + t.Fatal(err) + } + var types []string + for _, e := range evs { + types = append(types, e.EventType) + if e.EventType == "notification_created" && !strings.Contains(e.Payload, `"data"`) { + t.Fatalf("notification_created payload missing data: %s", e.Payload) + } + } + want := []string{"notification_created", "notification_updated", "notification_updated"} + if fmt.Sprint(types) != fmt.Sprint(want) { + t.Fatalf("notification CDC types = %v, want %v", types, want) + } +} + +func TestConcurrentNotificationEnqueueSameDedupeCreatesOneRow(t *testing.T) { + s, rec := newNotificationTestSession(t) + ctx := context.Background() + const n = 20 + var wg sync.WaitGroup + ids := make(chan domain.NotificationID, n) + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + row, _, err := s.EnqueueNotification(ctx, sampleNotification(rec, "dedupe-concurrent")) + if err != nil { + t.Errorf("enqueue: %v", err) + return + } + ids <- row.ID + }() + } + wg.Wait() + close(ids) + var first domain.NotificationID + for id := range ids { + if first == "" { + first = id + } + if id != first { + t.Fatalf("all callers should see same id, got %q and %q", first, id) + } + } + rows, err := s.ListNotifications(ctx, NotificationFilter{Limit: 10}) + if err != nil || len(rows) != 1 { + t.Fatalf("list len=%d err=%v", len(rows), err) + } +} + +func newNotificationTestSession(t *testing.T) (*Store, domain.SessionRecord) { + t.Helper() + s := newTestStore(t) + seedProject(t, s, "ao") + rec, err := s.CreateSession(context.Background(), sampleRecord("ao")) + if err != nil { + t.Fatalf("create session: %v", err) + } + return s, rec +} + +func sampleNotification(rec domain.SessionRecord, dedupe string) NotificationRow { + now := time.Now().UTC().Truncate(time.Second) + return NotificationRow{ + ProjectID: rec.ProjectID, + SessionID: rec.ID, + Source: "lifecycle", + EventType: "reaction.agent-needs-input", + SemanticType: "session.needs_input", + Priority: "urgent", + Message: "Agent needs input to continue.", + Payload: json.RawMessage(`{"schemaVersion":3,"semanticType":"session.needs_input"}`), + Actions: []domain.NotificationAction{{ID: "open-session", Kind: "route", Label: "Open session", Route: "/projects/ao/sessions/ao-1"}}, + DedupeKey: dedupe, + CauseKey: "agent-needs-input", + CreatedAt: now, + UpdatedAt: now, + } +} diff --git a/backend/internal/storage/sqlite/pr_facts.go b/backend/internal/storage/sqlite/pr_facts.go new file mode 100644 index 00000000..d72f2978 --- /dev/null +++ b/backend/internal/storage/sqlite/pr_facts.go @@ -0,0 +1,45 @@ +package sqlite + +import ( + "context" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// PRFactsForSession picks the PR that drives display/reaction status — the +// newest non-closed PR, else the newest PR — and folds in whether it has +// unresolved review comments. +func (s *Store) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) { + rows, err := s.ListPRsBySession(ctx, string(id)) + if err != nil { + return domain.PRFacts{}, err + } + if len(rows) == 0 { + return domain.PRFacts{}, nil + } + pick := rows[0] + for _, r := range rows { + if r.State == "draft" || r.State == "open" { + pick = r + break + } + } + facts := domain.PRFacts{ + URL: pick.URL, Number: int(pick.Number), Exists: true, + Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed", + CI: domain.CIState(pick.CIState), + Review: domain.ReviewDecision(pick.ReviewDecision), + Mergeability: domain.Mergeability(pick.Mergeability), + } + comments, err := s.ListPRComments(ctx, pick.URL) + if err != nil { + return domain.PRFacts{}, err + } + for _, c := range comments { + if !c.Resolved { + facts.ReviewComments = true + break + } + } + return facts, nil +} diff --git a/backend/internal/storage/sqlite/queries/notifications.sql b/backend/internal/storage/sqlite/queries/notifications.sql new file mode 100644 index 00000000..a896b43c --- /dev/null +++ b/backend/internal/storage/sqlite/queries/notifications.sql @@ -0,0 +1,70 @@ +-- name: InsertNotification :one +INSERT INTO notifications ( + project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, created_at, updated_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (dedupe_key) DO NOTHING +RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; + +-- name: GetNotification :one +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications WHERE id = ?; + +-- name: GetNotificationByDedupeKey :one +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications WHERE dedupe_key = ?; + +-- name: ListNotifications :many +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications +ORDER BY seq DESC +LIMIT ?; + +-- name: ListNotificationsByProject :many +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications +WHERE project_id = ? +ORDER BY seq DESC +LIMIT ?; + +-- name: ListNotificationsBySession :many +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications +WHERE session_id = ? +ORDER BY seq DESC +LIMIT ?; + +-- name: ListUnreadNotifications :many +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at +FROM notifications +WHERE read_at IS NULL AND archived_at IS NULL +ORDER BY seq DESC +LIMIT ?; + +-- name: MarkNotificationRead :one +UPDATE notifications +SET read_at = ?, updated_at = ? +WHERE id = ? AND read_at IS NULL +RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; + +-- name: MarkNotificationUnread :one +UPDATE notifications +SET read_at = NULL, updated_at = ? +WHERE id = ? AND read_at IS NOT NULL +RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; + +-- name: ArchiveNotification :one +UPDATE notifications +SET archived_at = ?, updated_at = ? +WHERE id = ? AND archived_at IS NULL +RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; diff --git a/backend/lifecycle_wiring.go b/backend/lifecycle_wiring.go index 8aecd470..35eac385 100644 --- a/backend/lifecycle_wiring.go +++ b/backend/lifecycle_wiring.go @@ -11,6 +11,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/config" "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" + "github.com/aoagents/agent-orchestrator/backend/internal/notification" "github.com/aoagents/agent-orchestrator/backend/internal/observe/reaper" "github.com/aoagents/agent-orchestrator/backend/internal/ports" "github.com/aoagents/agent-orchestrator/backend/internal/session" @@ -33,13 +34,14 @@ type lifecycleStack struct { // The goroutine stops when ctx is cancelled; Stop waits for it to drain. // // TEMPORARY STUBS (replace as the daemon lane lands the collaborators): -// - noopNotifier — swap for the notifier multiplexer (desktop/Slack/webhook). // - noopMessenger — swap for the runtime/agent-plugin-backed AgentMessenger. // - reaper.MapRegistry{} — empty runtime registry, so the reaper ticks // escalations but probes nothing until the runtime plugins exist. func startLifecycle(ctx context.Context, store *sqlite.Store, logger *slog.Logger) (*lifecycleStack, error) { a := wiring.Adapter{Store: store} - lcm := lifecycle.New(a, a, noopNotifier{}, noopMessenger{}) + renderer := notification.NewRenderer(store) + notifier := notification.NewEnqueuer(store, renderer, logger) + lcm := lifecycle.New(a, a, notifier, noopMessenger{}) rp := reaper.New(lcm, reaper.MapRegistry{}, reaper.Config{Logger: logger}) return &lifecycleStack{LCM: lcm, Adapter: a, reaperDone: rp.Start(ctx)}, nil } @@ -94,13 +96,9 @@ func startSession(ctx context.Context, cfg config.Config, ls *lifecycleStack, lo return &sessionStack{SM: sm}, nil } -// noopNotifier / noopMessenger are TEMPORARY stubs (see startLifecycle): the -// write path and CDC work without them; only the human push / agent nudge are -// absent until the real plugins are wired. -type noopNotifier struct{} - -func (noopNotifier) Notify(context.Context, ports.Event) error { return nil } - +// noopMessenger is a TEMPORARY stub (see startLifecycle): the canonical write +// path and durable notifications work without it; only live agent nudges are +// absent until the real runtime/agent plugin is wired. type noopMessenger struct{} func (noopMessenger) Send(context.Context, domain.SessionID, string) error { return nil } diff --git a/backend/wiring_test.go b/backend/wiring_test.go index 14bb3b4c..6a372c6c 100644 --- a/backend/wiring_test.go +++ b/backend/wiring_test.go @@ -14,6 +14,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/config" "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" + "github.com/aoagents/agent-orchestrator/backend/internal/notification" "github.com/aoagents/agent-orchestrator/backend/internal/ports" "github.com/aoagents/agent-orchestrator/backend/internal/session" "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" @@ -32,7 +33,10 @@ func TestWiring_WriteFlowsToBroadcaster(t *testing.T) { defer store.Close() a := wiring.Adapter{Store: store} - lcm := lifecycle.New(a, a, noopNotifier{}, noopMessenger{}) + renderer := notification.NewRenderer(store) + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + notifier := notification.NewEnqueuer(store, renderer, logger) + lcm := lifecycle.New(a, a, notifier, noopMessenger{}) bcast := cdc.NewBroadcaster() poller := cdc.NewPoller(cdcSource{store}, bcast, cdc.PollerConfig{})