diff --git a/backend/internal/daemon/daemon.go b/backend/internal/daemon/daemon.go index 260ccd5..a8d7f5b 100644 --- a/backend/internal/daemon/daemon.go +++ b/backend/internal/daemon/daemon.go @@ -14,7 +14,9 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/adapters/runtime/zellij" "github.com/aoagents/agent-orchestrator/backend/internal/config" "github.com/aoagents/agent-orchestrator/backend/internal/httpd" + "github.com/aoagents/agent-orchestrator/backend/internal/notify" "github.com/aoagents/agent-orchestrator/backend/internal/runfile" + notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification" projectsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/project" reviewsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/review" "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" @@ -82,11 +84,14 @@ func Run() error { // Built before the Lifecycle Manager so the LCM can use it for SCM-driven // agent nudges (CI failure, review feedback, merge conflict). messenger := newSessionMessenger(store, runtimeAdapter, log) + notificationHub := notify.NewHub() + notifier := notificationsvc.New(notificationsvc.Deps{Store: store}) + notificationWriter := notify.New(notify.Deps{Store: store, Publisher: notificationHub}) // Bring up the Lifecycle Manager and the reaper first: it makes the session // lifecycle write path live (reducer write -> store -> DB trigger -> // change_log -> poller -> broadcaster) and gives startSession the shared LCM. - lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, log) + lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, notificationWriter, log) lcStack.scmDone = startSCMObserver(ctx, store, lcStack.LCM, log) // Wire the controller-facing session service over the same store + LCM, the @@ -104,12 +109,14 @@ func Run() error { } srv, err := httpd.NewWithDeps(cfg, log, termMgr, httpd.APIDeps{ - Projects: projectsvc.New(store), - Sessions: sessionSvc, - Reviews: reviewsvc.NewInMemory(), - CDC: store, - Events: cdcPipe.Broadcaster, - Activity: lcStack.LCM, + Projects: projectsvc.New(store), + Sessions: sessionSvc, + Reviews: reviewsvc.NewInMemory(), + Notifications: notifier, + NotificationStream: notificationHub, + CDC: store, + Events: cdcPipe.Broadcaster, + Activity: lcStack.LCM, }) if err != nil { stop() diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index b4d4090..c57e506 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -20,6 +20,10 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" ) +type notificationSink interface { + Notify(context.Context, ports.NotificationIntent) error +} + // lifecycleStack owns the runtime reaper goroutine started with the lifecycle // reducer. The reducer itself is only used for wiring observations into storage. type lifecycleStack struct { @@ -35,8 +39,8 @@ type lifecycleStack struct { // reaper. The goroutine stops when ctx is cancelled; Stop waits for it to drain. // The messenger is the per-daemon agent messenger the LCM uses to nudge agents // in response to SCM observations (CI failure, review feedback, merge conflict). -func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runtime, messenger ports.AgentMessenger, logger *slog.Logger) *lifecycleStack { - lcm := lifecycle.New(store, messenger) +func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runtime, messenger ports.AgentMessenger, notifier notificationSink, logger *slog.Logger) *lifecycleStack { + lcm := lifecycle.New(store, messenger, lifecycle.WithNotificationSink(notifier)) rp := reaper.New(lcm, store, runtime, reaper.Config{Logger: logger}) return &lifecycleStack{LCM: lcm, reaperDone: rp.Start(ctx)} } diff --git a/backend/internal/daemon/wiring_test.go b/backend/internal/daemon/wiring_test.go index 0e4815d..df194ad 100644 --- a/backend/internal/daemon/wiring_test.go +++ b/backend/internal/daemon/wiring_test.go @@ -315,7 +315,7 @@ func TestWiring_StartLifecycleThreadsMessengerIntoLCM(t *testing.T) { log := slog.New(slog.NewTextHandler(io.Discard, nil)) messenger := &captureMessenger{} - stack := startLifecycle(ctx, store, zellij.New(zellij.Options{}), messenger, log) + stack := startLifecycle(ctx, store, zellij.New(zellij.Options{}), messenger, nil, log) t.Cleanup(stack.Stop) t.Cleanup(cancel) diff --git a/backend/internal/domain/notification.go b/backend/internal/domain/notification.go new file mode 100644 index 0000000..ed5a8aa --- /dev/null +++ b/backend/internal/domain/notification.go @@ -0,0 +1,86 @@ +package domain + +import ( + "errors" + "time" +) + +// NotificationType identifies a user-facing notification kind persisted for the dashboard. +type NotificationType string + +const ( + // NotificationNeedsInput means an agent session is waiting for user input. + NotificationNeedsInput NotificationType = "needs_input" + // NotificationReadyToMerge means a PR has no known merge blockers. + NotificationReadyToMerge NotificationType = "ready_to_merge" + // NotificationPRMerged means a tracked PR was merged. + NotificationPRMerged NotificationType = "pr_merged" + // NotificationPRClosedUnmerged means a tracked PR closed without merging. + NotificationPRClosedUnmerged NotificationType = "pr_closed_unmerged" +) + +// Valid reports whether t is one of the v1 notification kinds. +func (t NotificationType) Valid() bool { + switch t { + case NotificationNeedsInput, NotificationReadyToMerge, NotificationPRMerged, NotificationPRClosedUnmerged: + return true + default: + return false + } +} + +// NotificationStatus is the read state for a stored notification. +type NotificationStatus string + +const ( + // NotificationUnread marks a notification that has not been acknowledged. + NotificationUnread NotificationStatus = "unread" + // NotificationRead marks a notification that has been acknowledged. + NotificationRead NotificationStatus = "read" +) + +// Valid reports whether s is a supported notification read state. +func (s NotificationStatus) Valid() bool { + switch s { + case NotificationUnread, NotificationRead: + return true + default: + return false + } +} + +// NotificationRecord is the durable notification persistence shape. +type NotificationRecord struct { + ID string + SessionID SessionID + ProjectID ProjectID + PRURL string + Type NotificationType + Title string + Body string + Status NotificationStatus + CreatedAt time.Time +} + +var ( + // ErrInvalidNotificationType reports an unknown notification type. + ErrInvalidNotificationType = errors.New("invalid notification type") + // ErrInvalidNotificationStatus reports an unknown notification status. + ErrInvalidNotificationStatus = errors.New("invalid notification status") + // ErrInvalidNotificationRecord reports a missing required notification field. + ErrInvalidNotificationRecord = errors.New("invalid notification record") +) + +// Validate checks the required fields and enum values for a stored notification. +func (r NotificationRecord) Validate() error { + if r.SessionID == "" || r.ProjectID == "" || r.Title == "" || r.CreatedAt.IsZero() { + return ErrInvalidNotificationRecord + } + if !r.Type.Valid() { + return ErrInvalidNotificationType + } + if !r.Status.Valid() { + return ErrInvalidNotificationStatus + } + return nil +} diff --git a/backend/internal/httpd/api.go b/backend/internal/httpd/api.go index d3969e8..40b65d8 100644 --- a/backend/internal/httpd/api.go +++ b/backend/internal/httpd/api.go @@ -18,24 +18,27 @@ import ( // APIDeps bundles every service the API layer's controllers depend on. type APIDeps struct { - Projects projectsvc.Manager - Sessions controllers.SessionService - Activity controllers.ActivityRecorder - PRs prsvc.ActionManager - Reviews reviewsvc.Manager - CDC cdc.Source - Events cdcSubscriber + Projects projectsvc.Manager + Sessions controllers.SessionService + Activity controllers.ActivityRecorder + PRs prsvc.ActionManager + Reviews reviewsvc.Manager + Notifications controllers.NotificationService + NotificationStream controllers.NotificationStream + CDC cdc.Source + Events cdcSubscriber } // API owns one controller per resource and is the single Register call the // router invokes to mount the /api/v1 surface. type API struct { - cfg config.Config - projects *controllers.ProjectsController - sessions *controllers.SessionsController - prs *controllers.PRsController - reviews *controllers.ReviewsController - events *EventsController + cfg config.Config + projects *controllers.ProjectsController + sessions *controllers.SessionsController + prs *controllers.PRsController + reviews *controllers.ReviewsController + notifications *controllers.NotificationsController + events *EventsController } // NewAPI constructs the API surface from its dependencies. cfg carries the @@ -51,9 +54,10 @@ func NewAPI(cfg config.Config, deps APIDeps) *API { Svc: deps.Sessions, Activity: deps.Activity, }, - prs: &controllers.PRsController{Svc: deps.PRs}, - reviews: &controllers.ReviewsController{Svc: deps.Reviews}, - events: &EventsController{Source: deps.CDC, Live: deps.Events}, + prs: &controllers.PRsController{Svc: deps.PRs}, + reviews: &controllers.ReviewsController{Svc: deps.Reviews}, + notifications: &controllers.NotificationsController{Svc: deps.Notifications, Stream: deps.NotificationStream}, + events: &EventsController{Source: deps.CDC, Live: deps.Events}, } } @@ -75,9 +79,11 @@ func (a *API) Register(root chi.Router) { a.sessions.Register(r) a.prs.Register(r) a.reviews.Register(r) + a.notifications.Register(r) // Sibling REST controllers plug in here. }) // Long-lived streams intentionally bypass the REST timeout middleware. + a.notifications.RegisterStream(r) a.events.Register(r) }) } diff --git a/backend/internal/httpd/apispec/openapi.yaml b/backend/internal/httpd/apispec/openapi.yaml index c91c02e..d717a5c 100644 --- a/backend/internal/httpd/apispec/openapi.yaml +++ b/backend/internal/httpd/apispec/openapi.yaml @@ -51,6 +51,87 @@ paths: summary: Stream CDC events with durable replay tags: - events + /api/v1/notifications: + get: + operationId: listNotifications + parameters: + - description: Notification status filter. V1 supports only unread. + in: query + name: status + schema: + description: Notification status filter. V1 supports only unread. + enum: + - unread + type: string + - description: Maximum notifications to return. Defaults to 50; capped at 100. + in: query + name: limit + schema: + description: Maximum notifications to return. Defaults to 50; capped at + 100. + maximum: 100 + minimum: 1 + type: integer + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/ListNotificationsResponse' + description: OK + "400": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Bad Request + "500": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Internal Server Error + "501": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Not Implemented + summary: List unread notifications + tags: + - notifications + /api/v1/notifications/stream: + get: + operationId: streamNotifications + parameters: + - description: Optional project id filter for live notifications. + in: query + name: projectId + schema: + description: Optional project id filter for live notifications. + type: string + responses: + "200": + content: + text/event-stream: + schema: + type: string + description: OK + "500": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Internal Server Error + "501": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Not Implemented + summary: Stream created notifications + tags: + - notifications /api/v1/orchestrators: get: operationId: listOrchestrators @@ -1186,6 +1267,15 @@ components: - ok - sessionId type: object + ListNotificationsResponse: + properties: + notifications: + items: + $ref: '#/components/schemas/NotificationResponse' + type: array + required: + - notifications + type: object ListProjectsResponse: properties: projects: @@ -1238,6 +1328,64 @@ components: - prNumber - method type: object + NotificationResponse: + properties: + body: + type: string + createdAt: + format: date-time + type: string + id: + type: string + prUrl: + type: string + projectId: + type: string + sessionId: + type: string + status: + enum: + - unread + - read + type: string + target: + $ref: '#/components/schemas/NotificationTarget' + title: + type: string + type: + enum: + - needs_input + - ready_to_merge + - pr_merged + - pr_closed_unmerged + type: string + required: + - id + - sessionId + - projectId + - prUrl + - type + - title + - body + - status + - createdAt + - target + type: object + NotificationTarget: + properties: + kind: + enum: + - session + - pr + type: string + prUrl: + type: string + sessionId: + type: string + required: + - kind + - sessionId + type: object OrchestratorResponse: properties: id: @@ -1691,5 +1839,7 @@ tags: name: prs - description: Code-review runs and findings name: reviews +- description: Durable dashboard notifications + name: notifications - description: Server-sent CDC event stream with durable replay name: events diff --git a/backend/internal/httpd/apispec/specgen/build.go b/backend/internal/httpd/apispec/specgen/build.go index 502cc25..5ef6b87 100644 --- a/backend/internal/httpd/apispec/specgen/build.go +++ b/backend/internal/httpd/apispec/specgen/build.go @@ -63,6 +63,8 @@ func Build() ([]byte, error) { "Pull-request actions (SCM lane)"), *(&openapi31.Tag{Name: "reviews"}).WithDescription( "Code-review runs and findings"), + *(&openapi31.Tag{Name: "notifications"}).WithDescription( + "Durable dashboard notifications"), *(&openapi31.Tag{Name: "events"}).WithDescription( "Server-sent CDC event stream with durable replay"), } @@ -155,6 +157,11 @@ var schemaNames = map[string]string{ "ControllersSpawnOrchestratorRequest": "SpawnOrchestratorRequest", "ControllersSpawnOrchestratorResponse": "SpawnOrchestratorResponse", "ControllersOrchestratorResponse": "OrchestratorResponse", + "ControllersListNotificationsQuery": "ListNotificationsQuery", + "ControllersNotificationStreamQuery": "NotificationStreamQuery", + "ControllersNotificationTarget": "NotificationTarget", + "ControllersNotificationResponse": "NotificationResponse", + "ControllersListNotificationsResponse": "ListNotificationsResponse", // httpd/controllers — PR wire envelopes "ControllersMergePRResponse": "MergePRResponse", "ControllersResolveCommentsRequest": "ResolveCommentsRequest", @@ -252,9 +259,37 @@ func operations() []operation { ops = append(ops, sessionOperations()...) ops = append(ops, prOperations()...) ops = append(ops, reviewOperations()...) + ops = append(ops, notificationOperations()...) return ops } +func notificationOperations() []operation { + return []operation{ + { + method: http.MethodGet, path: "/api/v1/notifications", id: "listNotifications", tag: "notifications", + summary: "List unread notifications", + pathParams: []any{controllers.ListNotificationsQuery{}}, + resps: []respUnit{ + {http.StatusOK, controllers.ListNotificationsResponse{}}, + {http.StatusBadRequest, envelope.APIError{}}, + {http.StatusInternalServerError, envelope.APIError{}}, + {http.StatusNotImplemented, envelope.APIError{}}, + }, + }, + { + method: http.MethodGet, path: "/api/v1/notifications/stream", id: "streamNotifications", tag: "notifications", + summary: "Stream created notifications", + pathParams: []any{controllers.NotificationStreamQuery{}}, + resps: []respUnit{ + {http.StatusOK, ""}, + {http.StatusInternalServerError, envelope.APIError{}}, + {http.StatusNotImplemented, envelope.APIError{}}, + }, + contentTypes: map[int]string{http.StatusOK: "text/event-stream"}, + }, + } +} + // reviewOperations declares the /reviews operations. Must stay 1:1 with the // routes ReviewsController.Register mounts (enforced by the parity test). func reviewOperations() []operation { diff --git a/backend/internal/httpd/controllers/dto.go b/backend/internal/httpd/controllers/dto.go index 557c42e..d2bb10a 100644 --- a/backend/internal/httpd/controllers/dto.go +++ b/backend/internal/httpd/controllers/dto.go @@ -262,6 +262,43 @@ type OrchestratorResponse struct { ProjectName string `json:"projectName,omitempty"` } +// ListNotificationsQuery is the query string accepted by GET /api/v1/notifications. +type ListNotificationsQuery struct { + Status string `query:"status,omitempty" enum:"unread" description:"Notification status filter. V1 supports only unread."` + Limit int `query:"limit,omitempty" minimum:"1" maximum:"100" description:"Maximum notifications to return. Defaults to 50; capped at 100."` +} + +// NotificationStreamQuery is the query string accepted by GET /api/v1/notifications/stream. +type NotificationStreamQuery struct { + ProjectID string `query:"projectId,omitempty" description:"Optional project id filter for live notifications."` +} + +// NotificationTarget is the dashboard navigation target for a notification. +type NotificationTarget struct { + Kind string `json:"kind" enum:"session,pr"` + SessionID string `json:"sessionId"` + PRURL string `json:"prUrl,omitempty"` +} + +// NotificationResponse is one stored notification returned by the API. +type NotificationResponse struct { + ID string `json:"id"` + SessionID string `json:"sessionId"` + ProjectID string `json:"projectId"` + PRURL string `json:"prUrl"` + Type string `json:"type" enum:"needs_input,ready_to_merge,pr_merged,pr_closed_unmerged"` + Title string `json:"title"` + Body string `json:"body"` + Status string `json:"status" enum:"unread,read"` + CreatedAt time.Time `json:"createdAt"` + Target NotificationTarget `json:"target"` +} + +// ListNotificationsResponse is the body of GET /api/v1/notifications. +type ListNotificationsResponse struct { + Notifications []NotificationResponse `json:"notifications"` +} + // PRIDParam is the {id} path parameter shared by the /prs/{id} routes. type PRIDParam struct { ID string `path:"id" description:"PR number."` diff --git a/backend/internal/httpd/controllers/notifications.go b/backend/internal/httpd/controllers/notifications.go new file mode 100644 index 0000000..273d52a --- /dev/null +++ b/backend/internal/httpd/controllers/notifications.go @@ -0,0 +1,185 @@ +package controllers + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + + "github.com/go-chi/chi/v5" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/httpd/apispec" + "github.com/aoagents/agent-orchestrator/backend/internal/httpd/envelope" + notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification" +) + +// NotificationService is the controller-facing notification service contract. +type NotificationService interface { + ListUnread(ctx context.Context, filter notificationsvc.ListFilter) ([]notificationsvc.Notification, error) +} + +// NotificationStream is the live notification stream used by SSE clients. +type NotificationStream interface { + Subscribe(projectID domain.ProjectID) (<-chan domain.NotificationRecord, func()) +} + +// NotificationsController owns the /notifications routes. +type NotificationsController struct { + Svc NotificationService + Stream NotificationStream +} + +// Register mounts bounded notification REST routes on the supplied router. +func (c *NotificationsController) Register(r chi.Router) { + r.Get("/notifications", c.list) +} + +// RegisterStream mounts long-lived notification stream routes on the supplied router. +func (c *NotificationsController) RegisterStream(r chi.Router) { + r.Get("/notifications/stream", c.stream) +} + +func (c *NotificationsController) list(w http.ResponseWriter, r *http.Request) { + if c.Svc == nil { + apispec.NotImplemented(w, r, "GET", "/api/v1/notifications") + return + } + filter, err := parseNotificationListFilter(r) + if err != nil { + envelope.WriteAPIError(w, r, http.StatusBadRequest, "bad_request", "INVALID_QUERY", err.Error(), nil) + return + } + notifications, err := c.Svc.ListUnread(r.Context(), filter) + if err != nil { + envelope.WriteError(w, r, err) + return + } + envelope.WriteJSON(w, http.StatusOK, ListNotificationsResponse{Notifications: notificationResponses(notifications)}) +} + +func (c *NotificationsController) stream(w http.ResponseWriter, r *http.Request) { + if c.Stream == nil { + apispec.NotImplemented(w, r, "GET", "/api/v1/notifications/stream") + return + } + flusher, ok := w.(http.Flusher) + if !ok { + envelope.WriteAPIError(w, r, http.StatusInternalServerError, "internal", "SSE_UNSUPPORTED", "Streaming is not supported by this server", nil) + return + } + ch, unsubscribe := c.Stream.Subscribe(domain.ProjectID(r.URL.Query().Get("projectId"))) + defer unsubscribe() + + h := w.Header() + h.Set("Content-Type", "text/event-stream; charset=utf-8") + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "keep-alive") + h.Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + for { + select { + case <-r.Context().Done(): + return + case rec, ok := <-ch: + if !ok { + return + } + if err := writeNotificationSSE(w, flusher, rec); err != nil { + return + } + } + } +} + +func writeNotificationSSE(w http.ResponseWriter, flusher http.Flusher, rec domain.NotificationRecord) error { + data, err := json.Marshal(notificationResponseFromRecord(rec)) + if err != nil { + return err + } + if _, err := fmt.Fprintf(w, "event: notification_created\ndata: %s\n\n", data); err != nil { + return err + } + flusher.Flush() + return nil +} + +func parseNotificationListFilter(r *http.Request) (notificationsvc.ListFilter, error) { + q := r.URL.Query() + status := q.Get("status") + if status == "" { + status = "unread" + } + if status != "unread" { + return notificationsvc.ListFilter{}, errNotificationStatusUnsupported + } + limit := notificationsvc.DefaultListLimit + if raw := q.Get("limit"); raw != "" { + parsed, err := strconv.Atoi(raw) + if err != nil || parsed <= 0 { + return notificationsvc.ListFilter{}, errNotificationLimitInvalid + } + limit = parsed + } + if limit > notificationsvc.MaxListLimit { + limit = notificationsvc.MaxListLimit + } + return notificationsvc.ListFilter{Limit: limit}, nil +} + +var ( + errNotificationStatusUnsupported = notificationQueryError("status must be unread") + errNotificationLimitInvalid = notificationQueryError("limit must be a positive integer") +) + +type notificationQueryError string + +func (e notificationQueryError) Error() string { return string(e) } + +func notificationResponses(in []notificationsvc.Notification) []NotificationResponse { + out := make([]NotificationResponse, 0, len(in)) + for _, n := range in { + out = append(out, NotificationResponse{ + ID: n.ID, + SessionID: string(n.SessionID), + ProjectID: string(n.ProjectID), + PRURL: n.PRURL, + Type: string(n.Type), + Title: n.Title, + Body: n.Body, + Status: string(n.Status), + CreatedAt: n.CreatedAt, + Target: NotificationTarget{ + Kind: string(n.Target.Kind), + SessionID: string(n.Target.SessionID), + PRURL: n.Target.PRURL, + }, + }) + } + return out +} + +func notificationResponseFromRecord(rec domain.NotificationRecord) NotificationResponse { + return NotificationResponse{ + ID: rec.ID, + SessionID: string(rec.SessionID), + ProjectID: string(rec.ProjectID), + PRURL: rec.PRURL, + Type: string(rec.Type), + Title: rec.Title, + Body: rec.Body, + Status: string(rec.Status), + CreatedAt: rec.CreatedAt, + Target: notificationTargetFromRecord(rec), + } +} + +func notificationTargetFromRecord(rec domain.NotificationRecord) NotificationTarget { + if rec.PRURL != "" { + return NotificationTarget{Kind: "pr", SessionID: string(rec.SessionID), PRURL: rec.PRURL} + } + return NotificationTarget{Kind: "session", SessionID: string(rec.SessionID)} +} diff --git a/backend/internal/httpd/controllers/notifications_test.go b/backend/internal/httpd/controllers/notifications_test.go new file mode 100644 index 0000000..def4992 --- /dev/null +++ b/backend/internal/httpd/controllers/notifications_test.go @@ -0,0 +1,157 @@ +package controllers_test + +import ( + "bufio" + "context" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/httpd" + "github.com/aoagents/agent-orchestrator/backend/internal/httpd/controllers" + notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification" +) + +type fakeNotificationService struct { + gotFilter notificationsvc.ListFilter + items []notificationsvc.Notification + err error +} + +type fakeNotificationStream struct { + gotProject domain.ProjectID + ch chan domain.NotificationRecord +} + +func (f *fakeNotificationService) ListUnread(_ context.Context, filter notificationsvc.ListFilter) ([]notificationsvc.Notification, error) { + f.gotFilter = filter + return f.items, f.err +} + +func (f *fakeNotificationStream) Subscribe(projectID domain.ProjectID) (<-chan domain.NotificationRecord, func()) { + f.gotProject = projectID + if f.ch == nil { + f.ch = make(chan domain.NotificationRecord, 1) + } + return f.ch, func() {} +} + +func newNotificationTestServer(t *testing.T, svc controllers.NotificationService) *httptest.Server { + t.Helper() + return newNotificationStreamTestServer(t, svc, nil) +} + +func newNotificationStreamTestServer(t *testing.T, svc controllers.NotificationService, stream controllers.NotificationStream) *httptest.Server { + t.Helper() + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + srv := httptest.NewServer(httpd.NewRouterWithControl(config.Config{}, log, nil, httpd.APIDeps{Notifications: svc, NotificationStream: stream}, httpd.ControlDeps{})) + t.Cleanup(srv.Close) + return srv +} + +func TestNotificationsAPI_ListUnread(t *testing.T) { + now := time.Date(2026, 6, 11, 10, 0, 0, 0, time.UTC) + svc := &fakeNotificationService{items: []notificationsvc.Notification{{ + NotificationRecord: domain.NotificationRecord{ID: "ntf_1", SessionID: "mer-1", ProjectID: "mer", Type: domain.NotificationNeedsInput, Title: "checkout-flow needs input", Body: "The agent is waiting for your response.", Status: domain.NotificationUnread, CreatedAt: now}, + Target: notificationsvc.Target{Kind: notificationsvc.TargetSession, SessionID: "mer-1"}, + }}} + srv := newNotificationTestServer(t, svc) + + body, status, _ := doRequest(t, srv, "GET", "/api/v1/notifications?limit=10", "") + if status != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", status, body) + } + if svc.gotFilter.Limit != 10 { + t.Fatalf("filter = %+v", svc.gotFilter) + } + var resp struct { + Notifications []struct { + ID string `json:"id"` + SessionID string `json:"sessionId"` + ProjectID string `json:"projectId"` + Type string `json:"type"` + Status string `json:"status"` + Target struct { + Kind string `json:"kind"` + SessionID string `json:"sessionId"` + } `json:"target"` + } `json:"notifications"` + } + mustJSON(t, body, &resp) + if len(resp.Notifications) != 1 || resp.Notifications[0].ID != "ntf_1" || resp.Notifications[0].Target.Kind != "session" { + t.Fatalf("resp = %+v", resp) + } +} + +func TestNotificationsAPI_DefaultsAndCapsLimit(t *testing.T) { + svc := &fakeNotificationService{} + srv := newNotificationTestServer(t, svc) + + _, status, _ := doRequest(t, srv, "GET", "/api/v1/notifications?limit=999", "") + if status != http.StatusOK { + t.Fatalf("status = %d, want 200", status) + } + if svc.gotFilter.Limit != notificationsvc.MaxListLimit { + t.Fatalf("limit = %d, want cap %d", svc.gotFilter.Limit, notificationsvc.MaxListLimit) + } +} + +func TestNotificationsAPI_RejectsUnsupportedStatus(t *testing.T) { + srv := newNotificationTestServer(t, &fakeNotificationService{}) + + body, status, _ := doRequest(t, srv, "GET", "/api/v1/notifications?status=read", "") + assertErrorCode(t, body, status, http.StatusBadRequest, "INVALID_QUERY") +} + +func TestNotificationsAPI_WithoutServiceIs501(t *testing.T) { + srv := newNotificationTestServer(t, nil) + + body, status, _ := doRequest(t, srv, "GET", "/api/v1/notifications", "") + assertErrorCode(t, body, status, http.StatusNotImplemented, "NOT_IMPLEMENTED") +} + +func TestNotificationsAPI_StreamCreatedNotifications(t *testing.T) { + stream := &fakeNotificationStream{ch: make(chan domain.NotificationRecord, 1)} + srv := newNotificationStreamTestServer(t, &fakeNotificationService{}, stream) + + resp, err := srv.Client().Get(srv.URL + "/api/v1/notifications/stream?projectId=mer") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d, want 200", resp.StatusCode) + } + if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "text/event-stream") { + t.Fatalf("content-type = %q", ct) + } + if stream.gotProject != "mer" { + t.Fatalf("project filter = %q", stream.gotProject) + } + + stream.ch <- domain.NotificationRecord{ID: "ntf_1", SessionID: "mer-1", ProjectID: "mer", Type: domain.NotificationNeedsInput, Title: "needs input", Status: domain.NotificationUnread, CreatedAt: time.Now()} + reader := bufio.NewReader(resp.Body) + eventLine, err := reader.ReadString('\n') + if err != nil { + t.Fatal(err) + } + dataLine, err := reader.ReadString('\n') + if err != nil { + t.Fatal(err) + } + if strings.TrimSpace(eventLine) != "event: notification_created" || !strings.Contains(dataLine, `"id":"ntf_1"`) { + t.Fatalf("eventLine=%q dataLine=%q", eventLine, dataLine) + } +} + +func TestNotificationsAPI_StreamWithoutPublisherIs501(t *testing.T) { + srv := newNotificationStreamTestServer(t, &fakeNotificationService{}, nil) + body, status, _ := doRequest(t, srv, "GET", "/api/v1/notifications/stream", "") + assertErrorCode(t, body, status, http.StatusNotImplemented, "NOT_IMPLEMENTED") +} diff --git a/backend/internal/httpd/controllers/sessions.go b/backend/internal/httpd/controllers/sessions.go index bf03358..3d0b2bb 100644 --- a/backend/internal/httpd/controllers/sessions.go +++ b/backend/internal/httpd/controllers/sessions.go @@ -304,6 +304,10 @@ func (c *SessionsController) activity(w http.ResponseWriter, r *http.Request) { return } if err := c.Activity.ApplyActivitySignal(r.Context(), sessionID(r), ports.ActivitySignal{Valid: true, State: state}); err != nil { + if errors.Is(err, ports.ErrSessionNotFound) { + envelope.WriteAPIError(w, r, http.StatusNotFound, "not_found", "SESSION_NOT_FOUND", "Unknown session", nil) + return + } envelope.WriteError(w, r, err) return } diff --git a/backend/internal/httpd/controllers/sessions_activity_test.go b/backend/internal/httpd/controllers/sessions_activity_test.go index ed63dc4..5d7b669 100644 --- a/backend/internal/httpd/controllers/sessions_activity_test.go +++ b/backend/internal/httpd/controllers/sessions_activity_test.go @@ -84,6 +84,13 @@ func TestSessionsAPI_ActivityRejectsBadJSON(t *testing.T) { assertErrorCode(t, body, status, http.StatusBadRequest, "INVALID_JSON") } +func TestSessionsAPI_ActivityMissingSessionIs404(t *testing.T) { + srv := newActivityTestServer(t, &fakeActivityRecorder{err: ports.ErrSessionNotFound}) + + body, status, _ := doRequest(t, srv, "POST", "/api/v1/sessions/missing/activity", `{"state":"idle"}`) + assertErrorCode(t, body, status, http.StatusNotFound, "SESSION_NOT_FOUND") +} + func TestSessionsAPI_ActivityRecorderErrorIs500(t *testing.T) { srv := newActivityTestServer(t, &fakeActivityRecorder{err: errors.New("boom")}) diff --git a/backend/internal/lifecycle/manager.go b/backend/internal/lifecycle/manager.go index 925af55..bbb2896 100644 --- a/backend/internal/lifecycle/manager.go +++ b/backend/internal/lifecycle/manager.go @@ -7,6 +7,7 @@ package lifecycle import ( "context" "fmt" + "log/slog" "sync" "time" @@ -23,11 +24,25 @@ type sessionStore interface { UpdatePRLastNudgeSignature(ctx context.Context, prURL, payload string) error } +// notificationSink is the optional lifecycle-to-notification-producer boundary. +type notificationSink interface { + Notify(ctx context.Context, intent ports.NotificationIntent) error +} + +// Option customizes a Manager. +type Option func(*Manager) + +// WithNotificationSink wires lifecycle notification intents to a write-side producer. +func WithNotificationSink(sink notificationSink) Option { + return func(m *Manager) { m.notifications = sink } +} + // Manager reduces runtime, activity, spawn, and termination observations into durable session facts. // It also owns agent nudges caused by PR observations, including merge-conflict, CI-failure, and review-feedback prompts. type Manager struct { - store sessionStore - messenger ports.AgentMessenger + store sessionStore + messenger ports.AgentMessenger + notifications notificationSink mu sync.Mutex window time.Duration @@ -36,8 +51,12 @@ type Manager struct { } // New builds a Lifecycle Manager over the session store it writes and the messenger it uses for agent nudges. -func New(store sessionStore, messenger ports.AgentMessenger) *Manager { - return &Manager{store: store, messenger: messenger, window: defaultRecentActivityWindow, clock: time.Now, react: newReactionState()} +func New(store sessionStore, messenger ports.AgentMessenger, opts ...Option) *Manager { + m := &Manager{store: store, messenger: messenger, window: defaultRecentActivityWindow, clock: time.Now, react: newReactionState()} + for _, opt := range opts { + opt(m) + } + return m } func (m *Manager) mutate(ctx context.Context, id domain.SessionID, fn func(domain.SessionRecord, time.Time) (domain.SessionRecord, bool)) error { @@ -79,29 +98,66 @@ func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID, if !s.Valid { return nil } - return m.mutate(ctx, id, func(cur domain.SessionRecord, now time.Time) (domain.SessionRecord, bool) { - if cur.IsTerminated { - return cur, false - } - next := cur - act := domain.Activity{State: s.State, LastActivityAt: timeOr(s.Timestamp, now)} - // A same-state repeat is still a write when it is the FIRST signal for - // this spawn: the receipt itself is a durable fact (it clears the - // no_signal display status). Hook deliveries are best-effort, so the - // first to ARRIVE may match the seeded state — e.g. a turn's "active" - // POST is lost and its Stop hook lands idle on the idle-seeded row. - if sameActivity(cur.Activity, act) && !cur.FirstSignalAt.IsZero() { - return cur, false - } - next.Activity = act - if next.FirstSignalAt.IsZero() { - next.FirstSignalAt = timeOr(s.Timestamp, now) - } - if s.State == domain.ActivityExited { - next.IsTerminated = true + var intent *ports.NotificationIntent + m.mu.Lock() + rec, ok, err := m.store.GetSession(ctx, id) + if err != nil { + m.mu.Unlock() + return err + } + if !ok { + m.mu.Unlock() + return fmt.Errorf("%w: %s", ports.ErrSessionNotFound, id) + } + now := m.clock() + if rec.IsTerminated { + m.mu.Unlock() + return nil + } + next := rec + act := domain.Activity{State: s.State, LastActivityAt: timeOr(s.Timestamp, now)} + // A same-state repeat is still a write when it is the FIRST signal for + // this spawn: the receipt itself is a durable fact (it clears the + // no_signal display status). Hook deliveries are best-effort, so the + // first to ARRIVE may match the seeded state — e.g. a turn's "active" + // POST is lost and its Stop hook lands idle on the idle-seeded row. + if sameActivity(rec.Activity, act) && !rec.FirstSignalAt.IsZero() { + m.mu.Unlock() + return nil + } + next.Activity = act + if next.FirstSignalAt.IsZero() { + next.FirstSignalAt = timeOr(s.Timestamp, now) + } + if s.State == domain.ActivityExited { + next.IsTerminated = true + } + next.UpdatedAt = now + if err := m.store.UpdateSession(ctx, next); err != nil { + m.mu.Unlock() + return err + } + if rec.Activity.State != domain.ActivityWaitingInput && next.Activity.State == domain.ActivityWaitingInput && !next.IsTerminated { + intent = &ports.NotificationIntent{ + Type: domain.NotificationNeedsInput, + SessionID: next.ID, + ProjectID: next.ProjectID, + CreatedAt: next.Activity.LastActivityAt, + SessionDisplayName: next.DisplayName, } - return next, true - }) + } + m.mu.Unlock() + m.emitNotification(ctx, intent) + return nil +} + +func (m *Manager) emitNotification(ctx context.Context, intent *ports.NotificationIntent) { + if intent == nil || m.notifications == nil { + return + } + if err := m.notifications.Notify(ctx, *intent); err != nil { + slog.Default().Warn("lifecycle: notification failed", "session", intent.SessionID, "type", intent.Type, "err", err) + } } // MarkSpawned marks a newly spawned or restored session live and stores runtime/workspace handles. diff --git a/backend/internal/lifecycle/manager_test.go b/backend/internal/lifecycle/manager_test.go index e739fa0..09adbf7 100644 --- a/backend/internal/lifecycle/manager_test.go +++ b/backend/internal/lifecycle/manager_test.go @@ -112,6 +112,14 @@ func TestActivity_InvalidIsIgnored(t *testing.T) { } } +func TestActivity_MissingSessionReturnsNotFound(t *testing.T) { + m, _, _ := newManager() + err := m.ApplyActivitySignal(ctx, "missing-1", ports.ActivitySignal{Valid: true, State: domain.ActivityWaitingInput}) + if !errors.Is(err, ports.ErrSessionNotFound) { + t.Fatalf("err = %v, want ErrSessionNotFound", err) + } +} + func TestMarkTerminated(t *testing.T) { m, st, _ := newManager() st.sessions["mer-1"] = working("mer-1") @@ -184,6 +192,18 @@ func TestSCMObservationProjectsToExistingPRReactions(t *testing.T) { } } +func TestSCMObservation_MissingSessionIsIgnored(t *testing.T) { + st := newFakeStore() + m := New(st, nil) + o := ports.SCMObservation{ + Fetched: true, + PR: ports.SCMPRObservation{URL: "pr1", Number: 1}, + } + if err := m.ApplySCMObservation(ctx, "missing-1", o); err != nil { + t.Fatalf("ApplySCMObservation missing session: %v", err) + } +} + func TestSCMObservationUsesPRHeadWhenCIHeadMissing(t *testing.T) { m, st, msg := newManager() st.sessions["mer-1"] = working("mer-1") @@ -546,3 +566,144 @@ func TestMarkSpawnedClearsFirstSignal(t *testing.T) { t.Fatalf("spawn/restore must clear the receipt, got %+v", got) } } + +type fakeNotificationSink struct { + intents []ports.NotificationIntent + err error +} + +func (f *fakeNotificationSink) Notify(_ context.Context, intent ports.NotificationIntent) error { + f.intents = append(f.intents, intent) + return f.err +} + +func TestActivity_WaitingInputTransitionEmitsNotification(t *testing.T) { + st := newFakeStore() + sink := &fakeNotificationSink{} + m := New(st, nil, WithNotificationSink(sink)) + now := time.Date(2026, 6, 11, 10, 0, 0, 0, time.UTC) + m.clock = func() time.Time { return now } + st.sessions["mer-1"] = domain.SessionRecord{ID: "mer-1", ProjectID: "mer", DisplayName: "checkout-flow", Activity: domain.Activity{State: domain.ActivityActive, LastActivityAt: now.Add(-time.Minute)}, FirstSignalAt: now.Add(-time.Minute)} + + if err := m.ApplyActivitySignal(ctx, "mer-1", ports.ActivitySignal{Valid: true, State: domain.ActivityWaitingInput}); err != nil { + t.Fatal(err) + } + if len(sink.intents) != 1 { + t.Fatalf("intents = %d, want 1", len(sink.intents)) + } + intent := sink.intents[0] + if intent.Type != domain.NotificationNeedsInput || intent.SessionID != "mer-1" || intent.ProjectID != "mer" || intent.SessionDisplayName != "checkout-flow" { + t.Fatalf("intent = %+v", intent) + } +} + +func TestActivity_WaitingInputSameStateDoesNotEmitNotification(t *testing.T) { + st := newFakeStore() + sink := &fakeNotificationSink{} + m := New(st, nil, WithNotificationSink(sink)) + now := time.Now() + st.sessions["mer-1"] = domain.SessionRecord{ID: "mer-1", ProjectID: "mer", Activity: domain.Activity{State: domain.ActivityWaitingInput, LastActivityAt: now}, FirstSignalAt: now} + + if err := m.ApplyActivitySignal(ctx, "mer-1", ports.ActivitySignal{Valid: true, State: domain.ActivityWaitingInput}); err != nil { + t.Fatal(err) + } + if len(sink.intents) != 0 { + t.Fatalf("same-state waiting_input emitted %+v", sink.intents) + } +} + +func TestActivity_TerminatedSessionDoesNotEmitNotification(t *testing.T) { + st := newFakeStore() + sink := &fakeNotificationSink{} + m := New(st, nil, WithNotificationSink(sink)) + st.sessions["mer-1"] = domain.SessionRecord{ID: "mer-1", ProjectID: "mer", IsTerminated: true, Activity: domain.Activity{State: domain.ActivityExited}} + + if err := m.ApplyActivitySignal(ctx, "mer-1", ports.ActivitySignal{Valid: true, State: domain.ActivityWaitingInput}); err != nil { + t.Fatal(err) + } + if len(sink.intents) != 0 { + t.Fatalf("terminated session emitted %+v", sink.intents) + } +} + +func TestSCMObservation_Notifications(t *testing.T) { + for _, tc := range []struct { + name string + obs ports.SCMObservation + want domain.NotificationType + }{ + { + name: "ready", + obs: ports.SCMObservation{Fetched: true, PR: ports.SCMPRObservation{URL: "https://github.com/o/r/pull/1", Number: 1, Title: "checkout"}, CI: ports.SCMCIObservation{Summary: string(domain.CIPassing)}, Review: ports.SCMReviewObservation{Decision: string(domain.ReviewApproved)}, Mergeability: ports.SCMMergeabilityObservation{State: string(domain.MergeMergeable)}}, + want: domain.NotificationReadyToMerge, + }, + { + name: "merged", + obs: ports.SCMObservation{Fetched: true, PR: ports.SCMPRObservation{URL: "https://github.com/o/r/pull/2", Number: 2, Merged: true}}, + want: domain.NotificationPRMerged, + }, + { + name: "closed", + obs: ports.SCMObservation{Fetched: true, PR: ports.SCMPRObservation{URL: "https://github.com/o/r/pull/3", Number: 3, Closed: true}}, + want: domain.NotificationPRClosedUnmerged, + }, + } { + t.Run(tc.name, func(t *testing.T) { + st := newFakeStore() + sink := &fakeNotificationSink{} + m := New(st, nil, WithNotificationSink(sink)) + st.sessions["mer-1"] = working("mer-1") + if err := m.ApplySCMObservation(ctx, "mer-1", tc.obs); err != nil { + t.Fatal(err) + } + if len(sink.intents) != 1 { + t.Fatalf("intents = %d, want 1", len(sink.intents)) + } + if got := sink.intents[0]; got.Type != tc.want || got.PRURL != tc.obs.PR.URL || got.PRNumber != tc.obs.PR.Number { + t.Fatalf("intent = %+v, want type %s", got, tc.want) + } + }) + } +} + +func TestSCMObservation_NotReadyWhenCIOrReviewBlocks(t *testing.T) { + for _, obs := range []ports.SCMObservation{ + {Fetched: true, PR: ports.SCMPRObservation{URL: "https://github.com/o/r/pull/1", Number: 1}, CI: ports.SCMCIObservation{Summary: string(domain.CIFailing)}, Mergeability: ports.SCMMergeabilityObservation{State: string(domain.MergeMergeable)}}, + {Fetched: true, PR: ports.SCMPRObservation{URL: "https://github.com/o/r/pull/1", Number: 1}, CI: ports.SCMCIObservation{Summary: string(domain.CIPending)}, Mergeability: ports.SCMMergeabilityObservation{State: string(domain.MergeMergeable)}}, + {Fetched: true, PR: ports.SCMPRObservation{URL: "https://github.com/o/r/pull/1", Number: 1}, CI: ports.SCMCIObservation{Summary: string(domain.CIUnknown)}, Mergeability: ports.SCMMergeabilityObservation{State: string(domain.MergeMergeable)}}, + {Fetched: true, PR: ports.SCMPRObservation{URL: "https://github.com/o/r/pull/1", Number: 1}, Mergeability: ports.SCMMergeabilityObservation{State: string(domain.MergeMergeable)}}, + {Fetched: true, PR: ports.SCMPRObservation{URL: "https://github.com/o/r/pull/1", Number: 1}, CI: ports.SCMCIObservation{Summary: string(domain.CIPassing)}, Review: ports.SCMReviewObservation{Decision: string(domain.ReviewChangesRequest)}, Mergeability: ports.SCMMergeabilityObservation{State: string(domain.MergeMergeable)}}, + } { + st := newFakeStore() + sink := &fakeNotificationSink{} + m := New(st, nil, WithNotificationSink(sink)) + st.sessions["mer-1"] = working("mer-1") + if err := m.ApplySCMObservation(ctx, "mer-1", obs); err != nil { + t.Fatal(err) + } + if len(sink.intents) != 0 { + t.Fatalf("blocked PR emitted %+v", sink.intents) + } + } +} + +func TestSCMObservation_ReadyToMergeSuppressedWhileWaitingInput(t *testing.T) { + st := newFakeStore() + sink := &fakeNotificationSink{} + m := New(st, nil, WithNotificationSink(sink)) + rec := working("mer-1") + rec.Activity.State = domain.ActivityWaitingInput + st.sessions["mer-1"] = rec + obs := ports.SCMObservation{ + Fetched: true, + PR: ports.SCMPRObservation{URL: "https://github.com/o/r/pull/1", Number: 1}, + CI: ports.SCMCIObservation{Summary: string(domain.CIPassing)}, + Mergeability: ports.SCMMergeabilityObservation{State: string(domain.MergeMergeable)}, + } + if err := m.ApplySCMObservation(ctx, "mer-1", obs); err != nil { + t.Fatal(err) + } + if len(sink.intents) != 0 { + t.Fatalf("waiting-input session emitted ready notification: %+v", sink.intents) + } +} diff --git a/backend/internal/lifecycle/reactions.go b/backend/internal/lifecycle/reactions.go index 614d3db..bfffd07 100644 --- a/backend/internal/lifecycle/reactions.go +++ b/backend/internal/lifecycle/reactions.go @@ -92,7 +92,92 @@ func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID, if !o.Fetched { return nil } - return m.ApplyPRObservation(ctx, id, scmToPRObservation(o)) + if err := m.ApplyPRObservation(ctx, id, scmToPRObservation(o)); err != nil { + return err + } + intent, err := m.notificationIntentForCurrentSCM(ctx, id, o) + if err != nil { + return err + } + m.emitNotification(ctx, intent) + return nil +} + +func (m *Manager) notificationIntentForCurrentSCM(ctx context.Context, id domain.SessionID, o ports.SCMObservation) (*ports.NotificationIntent, error) { + // Serialize the session snapshot with activity transitions so ready-to-merge + // notifications do not race against a simultaneous waiting_input update. + m.mu.Lock() + defer m.mu.Unlock() + rec, ok, err := m.store.GetSession(ctx, id) + if err != nil { + return nil, err + } + if !ok { + return nil, nil + } + return m.notificationIntentForSCM(rec, o), nil +} + +func (m *Manager) notificationIntentForSCM(rec domain.SessionRecord, o ports.SCMObservation) *ports.NotificationIntent { + prURL := firstSCMNonEmpty(o.PR.URL, o.PR.HTMLURL) + base := ports.NotificationIntent{ + SessionID: rec.ID, + ProjectID: rec.ProjectID, + PRURL: prURL, + CreatedAt: timeOr(o.ObservedAt, m.clock()), + SessionDisplayName: rec.DisplayName, + PRNumber: o.PR.Number, + PRTitle: o.PR.Title, + PRSourceBranch: o.PR.SourceBranch, + PRTargetBranch: o.PR.TargetBranch, + Provider: o.Provider, + Repo: o.Repo, + } + if o.PR.Merged { + base.Type = domain.NotificationPRMerged + return &base + } + if o.PR.Closed { + base.Type = domain.NotificationPRClosedUnmerged + return &base + } + if rec.IsTerminated || rec.Activity.State == domain.ActivityWaitingInput || !scmObservationIsReadyToMerge(o) { + return nil + } + base.Type = domain.NotificationReadyToMerge + return &base +} + +func scmObservationIsReadyToMerge(o ports.SCMObservation) bool { + if o.PR.Merged || o.PR.Closed || o.PR.Draft { + return false + } + ci := domain.CIState(o.CI.Summary) + if ci == "" { + ci = domain.CIUnknown + } + switch ci { + case domain.CIFailing, domain.CIPending, domain.CIUnknown: + return false + } + if domain.ReviewDecision(o.Review.Decision) == domain.ReviewChangesRequest || hasUnresolvedSCMComments(o.Review.Threads) { + return false + } + return domain.Mergeability(o.Mergeability.State) == domain.MergeMergeable +} + +func hasUnresolvedSCMComments(threads []ports.SCMReviewThreadObservation) bool { + for _, th := range threads { + if th.Resolved || th.IsBot { + continue + } + for _, c := range th.Comments { + if !c.IsBot { + return true + } + } + } + return false } func scmToPRObservation(o ports.SCMObservation) ports.PRObservation { diff --git a/backend/internal/notify/enrich.go b/backend/internal/notify/enrich.go new file mode 100644 index 0000000..728e270 --- /dev/null +++ b/backend/internal/notify/enrich.go @@ -0,0 +1,90 @@ +package notify + +import ( + "fmt" + "strings" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func enrich(intent Intent) (domain.NotificationRecord, error) { + rec := domain.NotificationRecord{ + SessionID: intent.SessionID, + ProjectID: intent.ProjectID, + PRURL: strings.TrimSpace(intent.PRURL), + Type: intent.Type, + Status: domain.NotificationUnread, + CreatedAt: intent.CreatedAt, + } + if !intent.Type.Valid() { + return domain.NotificationRecord{}, domain.ErrInvalidNotificationType + } + if intent.Type != domain.NotificationNeedsInput && rec.PRURL == "" { + return domain.NotificationRecord{}, domain.ErrInvalidNotificationRecord + } + rec.Title = titleForIntent(intent) + rec.Body = bodyForIntent(intent) + if err := rec.Validate(); err != nil { + return domain.NotificationRecord{}, err + } + return rec, nil +} + +func titleForIntent(intent Intent) string { + switch intent.Type { + case domain.NotificationNeedsInput: + return fmt.Sprintf("%s needs input", sessionLabel(intent)) + case domain.NotificationReadyToMerge: + return fmt.Sprintf("%s is ready to merge", prLabel(intent)) + case domain.NotificationPRMerged: + return fmt.Sprintf("%s was merged", prLabel(intent)) + case domain.NotificationPRClosedUnmerged: + return fmt.Sprintf("%s was closed without merging", prLabel(intent)) + default: + return "Notification" + } +} + +func bodyForIntent(intent Intent) string { + switch intent.Type { + case domain.NotificationNeedsInput: + return "The agent is waiting for your response." + case domain.NotificationReadyToMerge: + if s := sessionLabel(intent); s != "session" { + return fmt.Sprintf("%s has no known blocking CI or review feedback.", s) + } + return "The pull request has no known blocking CI or review feedback." + case domain.NotificationPRMerged: + if title := strings.TrimSpace(intent.PRTitle); title != "" { + return fmt.Sprintf("%s was merged.", title) + } + return "The pull request was merged." + case domain.NotificationPRClosedUnmerged: + if title := strings.TrimSpace(intent.PRTitle); title != "" { + return fmt.Sprintf("%s was closed without merging.", title) + } + return "The pull request was closed without merging." + default: + return "" + } +} + +func sessionLabel(intent Intent) string { + if v := strings.TrimSpace(intent.SessionDisplayName); v != "" { + return v + } + if intent.SessionID != "" { + return string(intent.SessionID) + } + return "session" +} + +func prLabel(intent Intent) string { + if intent.PRNumber > 0 { + return fmt.Sprintf("PR #%d", intent.PRNumber) + } + if title := strings.TrimSpace(intent.PRTitle); title != "" { + return "PR " + title + } + return "PR" +} diff --git a/backend/internal/notify/hub.go b/backend/internal/notify/hub.go new file mode 100644 index 0000000..51f22c9 --- /dev/null +++ b/backend/internal/notify/hub.go @@ -0,0 +1,69 @@ +package notify + +import ( + "context" + "sync" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +const subscriberBuffer = 64 + +type subscription struct { + projectID domain.ProjectID + ch chan domain.NotificationRecord +} + +// Hub is an in-process publisher for notification SSE subscribers. +type Hub struct { + mu sync.RWMutex + nextID int + subs map[int]subscription +} + +// NewHub constructs an empty notification Hub. +func NewHub() *Hub { + return &Hub{subs: map[int]subscription{}} +} + +// Subscribe registers a live notification subscriber. Empty projectID receives all projects. +func (h *Hub) Subscribe(projectID domain.ProjectID) (<-chan domain.NotificationRecord, func()) { + if h == nil { + ch := make(chan domain.NotificationRecord) + close(ch) + return ch, func() {} + } + ch := make(chan domain.NotificationRecord, subscriberBuffer) + h.mu.Lock() + id := h.nextID + h.nextID++ + h.subs[id] = subscription{projectID: projectID, ch: ch} + h.mu.Unlock() + return ch, func() { + h.mu.Lock() + if sub, ok := h.subs[id]; ok { + delete(h.subs, id) + close(sub.ch) + } + h.mu.Unlock() + } +} + +// Publish pushes a persisted notification to matching subscribers without blocking lifecycle writes. +func (h *Hub) Publish(_ context.Context, rec domain.NotificationRecord) error { + if h == nil { + return nil + } + h.mu.RLock() + defer h.mu.RUnlock() + for _, sub := range h.subs { + if sub.projectID != "" && sub.projectID != rec.ProjectID { + continue + } + select { + case sub.ch <- rec: + default: + } + } + return nil +} diff --git a/backend/internal/notify/manager.go b/backend/internal/notify/manager.go new file mode 100644 index 0000000..cd951c2 --- /dev/null +++ b/backend/internal/notify/manager.go @@ -0,0 +1,83 @@ +// Package notify owns notification write-side production and live dashboard fan-out. +package notify + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/google/uuid" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// Store is the write-side notification persistence boundary. +type Store interface { + CreateNotification(ctx context.Context, rec domain.NotificationRecord) (domain.NotificationRecord, bool, error) +} + +// Publisher pushes newly persisted notifications to live dashboard subscribers. +type Publisher interface { + Publish(ctx context.Context, rec domain.NotificationRecord) error +} + +// Intent is the lifecycle-to-notification producer contract. +type Intent = ports.NotificationIntent + +// Manager validates lifecycle intents, enriches them into stored rows, persists +// unread notifications, and publishes newly inserted rows to live subscribers. +type Manager struct { + store Store + publisher Publisher + clock func() time.Time + newID func() string +} + +// Deps configures a Manager. +type Deps struct { + Store Store + Publisher Publisher + Clock func() time.Time + NewID func() string +} + +// New constructs a write-side notification manager. +func New(d Deps) *Manager { + m := &Manager{store: d.Store, publisher: d.Publisher, clock: d.Clock, newID: d.NewID} + if m.clock == nil { + m.clock = time.Now + } + if m.newID == nil { + m.newID = func() string { return "ntf_" + uuid.NewString() } + } + return m +} + +// Notify stores one notification intent and publishes it after persistence. +// Duplicate unread rows are treated as a clean no-op. +func (m *Manager) Notify(ctx context.Context, intent Intent) error { + if m == nil || m.store == nil { + return errors.New("notify: store is required") + } + if intent.CreatedAt.IsZero() { + intent.CreatedAt = m.clock().UTC() + } + rec, err := enrich(intent) + if err != nil { + return fmt.Errorf("notify enrich: %w", err) + } + rec.ID = m.newID() + created, inserted, err := m.store.CreateNotification(ctx, rec) + if err != nil { + return fmt.Errorf("notify store: %w", err) + } + if !inserted || m.publisher == nil { + return nil + } + if err := m.publisher.Publish(ctx, created); err != nil { + return fmt.Errorf("notify publish: %w", err) + } + return nil +} diff --git a/backend/internal/notify/manager_test.go b/backend/internal/notify/manager_test.go new file mode 100644 index 0000000..4751682 --- /dev/null +++ b/backend/internal/notify/manager_test.go @@ -0,0 +1,95 @@ +package notify + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +type fakeStore struct { + rows []domain.NotificationRecord + duplicate bool + err error +} + +func (f *fakeStore) CreateNotification(_ context.Context, rec domain.NotificationRecord) (domain.NotificationRecord, bool, error) { + if f.err != nil { + return domain.NotificationRecord{}, false, f.err + } + if f.duplicate { + return domain.NotificationRecord{}, false, nil + } + f.rows = append(f.rows, rec) + return rec, true, nil +} + +func TestManagerNotifyPersistsThenPublishes(t *testing.T) { + st := &fakeStore{} + hub := NewHub() + ch, unsub := hub.Subscribe("") + defer unsub() + now := time.Date(2026, 6, 11, 10, 0, 0, 0, time.UTC) + mgr := New(Deps{Store: st, Publisher: hub, Clock: func() time.Time { return now }, NewID: func() string { return "ntf_1" }}) + + if err := mgr.Notify(context.Background(), Intent{Type: domain.NotificationNeedsInput, SessionID: "mer-1", ProjectID: "mer", SessionDisplayName: "checkout-flow"}); err != nil { + t.Fatalf("Notify: %v", err) + } + if len(st.rows) != 1 { + t.Fatalf("stored rows = %d, want 1", len(st.rows)) + } + if got := st.rows[0]; got.ID != "ntf_1" || got.CreatedAt != now || got.Status != domain.NotificationUnread || got.Title != "checkout-flow needs input" { + t.Fatalf("stored notification = %+v", got) + } + select { + case got := <-ch: + if got.ID != "ntf_1" { + t.Fatalf("published = %+v", got) + } + default: + t.Fatal("expected published notification") + } +} + +func TestManagerNotifyDuplicateDoesNotPublish(t *testing.T) { + st := &fakeStore{duplicate: true} + hub := NewHub() + ch, unsub := hub.Subscribe("") + defer unsub() + mgr := New(Deps{Store: st, Publisher: hub, Clock: func() time.Time { return time.Now() }, NewID: func() string { return "ntf_1" }}) + + if err := mgr.Notify(context.Background(), Intent{Type: domain.NotificationNeedsInput, SessionID: "mer-1", ProjectID: "mer", CreatedAt: time.Now()}); err != nil { + t.Fatalf("Notify duplicate: %v", err) + } + select { + case got := <-ch: + t.Fatalf("duplicate published %+v", got) + default: + } +} + +func TestManagerNotifyRejectsUnknownType(t *testing.T) { + mgr := New(Deps{Store: &fakeStore{}, Clock: func() time.Time { return time.Now() }}) + err := mgr.Notify(context.Background(), Intent{Type: "surprise", SessionID: "mer-1", ProjectID: "mer"}) + if !errors.Is(err, domain.ErrInvalidNotificationType) { + t.Fatalf("err = %v, want invalid type", err) + } +} + +func TestHubProjectFilter(t *testing.T) { + hub := NewHub() + ch, unsub := hub.Subscribe("mer") + defer unsub() + _ = hub.Publish(context.Background(), domain.NotificationRecord{ID: "skip", ProjectID: "ao"}) + _ = hub.Publish(context.Background(), domain.NotificationRecord{ID: "keep", ProjectID: "mer"}) + select { + case got := <-ch: + if got.ID != "keep" { + t.Fatalf("published = %+v", got) + } + default: + t.Fatal("expected filtered notification") + } +} diff --git a/backend/internal/ports/notifications.go b/backend/internal/ports/notifications.go new file mode 100644 index 0000000..0c02200 --- /dev/null +++ b/backend/internal/ports/notifications.go @@ -0,0 +1,27 @@ +package ports + +import ( + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// NotificationIntent is the lifecycle-to-notification-producer contract. It is +// not an HTTP DTO; lifecycle fills it from facts it already has after the +// underlying session/PR state write succeeds. +type NotificationIntent struct { + Type domain.NotificationType + SessionID domain.SessionID + ProjectID domain.ProjectID + PRURL string + CreatedAt time.Time + + // Enrichment hints. These avoid storage reads on the hot path. + SessionDisplayName string + PRNumber int + PRTitle string + PRSourceBranch string + PRTargetBranch string + Provider string + Repo string +} diff --git a/backend/internal/ports/session.go b/backend/internal/ports/session.go index 14742ea..0c28f17 100644 --- a/backend/internal/ports/session.go +++ b/backend/internal/ports/session.go @@ -1,6 +1,13 @@ package ports -import "github.com/aoagents/agent-orchestrator/backend/internal/domain" +import ( + "errors" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// ErrSessionNotFound reports an observation for an unknown session id. +var ErrSessionNotFound = errors.New("session not found") // SpawnConfig is the request to start a new session: which project/issue, which // agent harness, and the branch/prompt the agent launches with. diff --git a/backend/internal/service/notification/service.go b/backend/internal/service/notification/service.go new file mode 100644 index 0000000..25f92ed --- /dev/null +++ b/backend/internal/service/notification/service.go @@ -0,0 +1,68 @@ +package notification + +import ( + "context" + "errors" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +const ( + // DefaultListLimit is the unread notification page size used when none is requested. + DefaultListLimit = 50 + // MaxListLimit caps unread notification API responses. + MaxListLimit = 100 +) + +// Manager reads stored notifications for REST controllers. +type Manager struct { + store Store +} + +// Deps configures a Manager. +type Deps struct { + Store Store +} + +// New constructs a read-only notification Manager. +func New(d Deps) *Manager { + return &Manager{store: d.Store} +} + +// ListUnread returns unread notifications newest-first. +func (m *Manager) ListUnread(ctx context.Context, filter ListFilter) ([]Notification, error) { + if m == nil || m.store == nil { + return nil, errors.New("notification: store is required") + } + limit := normalizeLimit(filter.Limit) + rows, err := m.store.ListUnreadNotifications(ctx, limit) + if err != nil { + return nil, err + } + out := make([]Notification, 0, len(rows)) + for _, row := range rows { + out = append(out, notificationFromRecord(row)) + } + return out, nil +} + +func normalizeLimit(limit int) int { + if limit <= 0 { + return DefaultListLimit + } + if limit > MaxListLimit { + return MaxListLimit + } + return limit +} + +func notificationFromRecord(rec domain.NotificationRecord) Notification { + return Notification{NotificationRecord: rec, Target: targetForRecord(rec)} +} + +func targetForRecord(rec domain.NotificationRecord) Target { + if rec.PRURL != "" { + return Target{Kind: TargetPR, SessionID: rec.SessionID, PRURL: rec.PRURL} + } + return Target{Kind: TargetSession, SessionID: rec.SessionID} +} diff --git a/backend/internal/service/notification/service_test.go b/backend/internal/service/notification/service_test.go new file mode 100644 index 0000000..3330fa5 --- /dev/null +++ b/backend/internal/service/notification/service_test.go @@ -0,0 +1,44 @@ +package notification + +import ( + "context" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +type fakeStore struct { + rows []domain.NotificationRecord + err error +} + +func (f *fakeStore) CreateNotification(context.Context, domain.NotificationRecord) (domain.NotificationRecord, bool, error) { + return domain.NotificationRecord{}, false, nil +} + +func (f *fakeStore) ListUnreadNotifications(_ context.Context, _ int) ([]domain.NotificationRecord, error) { + return f.rows, f.err +} + +func TestListUnreadAddsTargets(t *testing.T) { + st := &fakeStore{rows: []domain.NotificationRecord{ + {ID: "n1", SessionID: "mer-1", ProjectID: "mer", Type: domain.NotificationNeedsInput, Title: "needs", Status: domain.NotificationUnread, CreatedAt: time.Now()}, + {ID: "n2", SessionID: "mer-1", ProjectID: "mer", PRURL: "https://github.com/o/r/pull/1", Type: domain.NotificationReadyToMerge, Title: "ready", Status: domain.NotificationUnread, CreatedAt: time.Now()}, + }} + mgr := New(Deps{Store: st}) + got, err := mgr.ListUnread(context.Background(), ListFilter{Limit: 10}) + if err != nil { + t.Fatalf("ListUnread: %v", err) + } + if got[0].Target.Kind != TargetSession || got[1].Target.Kind != TargetPR || got[1].Target.PRURL == "" { + t.Fatalf("targets = %+v", got) + } +} + +func TestListUnreadRequiresStore(t *testing.T) { + _, err := New(Deps{}).ListUnread(context.Background(), ListFilter{}) + if err == nil { + t.Fatal("want missing store error") + } +} diff --git a/backend/internal/service/notification/store.go b/backend/internal/service/notification/store.go new file mode 100644 index 0000000..58be262 --- /dev/null +++ b/backend/internal/service/notification/store.go @@ -0,0 +1,12 @@ +package notification + +import ( + "context" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// Store is the notification service's read persistence surface. +type Store interface { + ListUnreadNotifications(ctx context.Context, limit int) ([]domain.NotificationRecord, error) +} diff --git a/backend/internal/service/notification/types.go b/backend/internal/service/notification/types.go new file mode 100644 index 0000000..9d2673d --- /dev/null +++ b/backend/internal/service/notification/types.go @@ -0,0 +1,32 @@ +// Package notification exposes read-only notification DTOs for REST controllers. +package notification + +import "github.com/aoagents/agent-orchestrator/backend/internal/domain" + +// TargetKind describes what a dashboard should navigate to for a notification. +type TargetKind string + +const ( + // TargetSession navigates to a session detail view. + TargetSession TargetKind = "session" + // TargetPR navigates to a pull request view. + TargetPR TargetKind = "pr" +) + +// Target is the service-facing navigation metadata for a notification. +type Target struct { + Kind TargetKind + SessionID domain.SessionID + PRURL string +} + +// Notification is the dashboard-facing service DTO assembled from a stored row. +type Notification struct { + domain.NotificationRecord + Target Target +} + +// ListFilter controls unread notification listing. +type ListFilter struct { + Limit int +} diff --git a/backend/internal/storage/sqlite/gen/models.go b/backend/internal/storage/sqlite/gen/models.go index f8d614b..26610fe 100644 --- a/backend/internal/storage/sqlite/gen/models.go +++ b/backend/internal/storage/sqlite/gen/models.go @@ -21,6 +21,18 @@ type ChangeLog struct { CreatedAt time.Time } +type Notification struct { + ID string + SessionID domain.SessionID + ProjectID domain.ProjectID + PRURL string + Type domain.NotificationType + Title string + Body string + Status domain.NotificationStatus + CreatedAt time.Time +} + type PR struct { URL string SessionID domain.SessionID diff --git a/backend/internal/storage/sqlite/gen/notifications.sql.go b/backend/internal/storage/sqlite/gen/notifications.sql.go new file mode 100644 index 0000000..a70c6df --- /dev/null +++ b/backend/internal/storage/sqlite/gen/notifications.sql.go @@ -0,0 +1,130 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 +// source: notifications.sql + +package gen + +import ( + "context" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +const createNotification = `-- name: CreateNotification :one +INSERT INTO notifications ( + id, session_id, project_id, pr_url, type, title, body, status, created_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +RETURNING id, session_id, project_id, pr_url, type, title, body, status, created_at +` + +type CreateNotificationParams struct { + ID string + SessionID domain.SessionID + ProjectID domain.ProjectID + PRURL string + Type domain.NotificationType + Title string + Body string + Status domain.NotificationStatus + CreatedAt time.Time +} + +func (q *Queries) CreateNotification(ctx context.Context, arg CreateNotificationParams) (Notification, error) { + row := q.db.QueryRowContext(ctx, createNotification, + arg.ID, + arg.SessionID, + arg.ProjectID, + arg.PRURL, + arg.Type, + arg.Title, + arg.Body, + arg.Status, + arg.CreatedAt, + ) + var i Notification + err := row.Scan( + &i.ID, + &i.SessionID, + &i.ProjectID, + &i.PRURL, + &i.Type, + &i.Title, + &i.Body, + &i.Status, + &i.CreatedAt, + ) + return i, err +} + +const getUnreadNotificationByDedupe = `-- name: GetUnreadNotificationByDedupe :one +SELECT id, session_id, project_id, pr_url, type, title, body, status, created_at +FROM notifications +WHERE session_id = ? AND type = ? AND pr_url = ? AND status = 'unread' +LIMIT 1 +` + +type GetUnreadNotificationByDedupeParams struct { + SessionID domain.SessionID + Type domain.NotificationType + PRURL string +} + +func (q *Queries) GetUnreadNotificationByDedupe(ctx context.Context, arg GetUnreadNotificationByDedupeParams) (Notification, error) { + row := q.db.QueryRowContext(ctx, getUnreadNotificationByDedupe, arg.SessionID, arg.Type, arg.PRURL) + var i Notification + err := row.Scan( + &i.ID, + &i.SessionID, + &i.ProjectID, + &i.PRURL, + &i.Type, + &i.Title, + &i.Body, + &i.Status, + &i.CreatedAt, + ) + return i, err +} + +const listUnreadNotifications = `-- name: ListUnreadNotifications :many +SELECT id, session_id, project_id, pr_url, type, title, body, status, created_at +FROM notifications +WHERE status = 'unread' +ORDER BY created_at DESC +LIMIT ? +` + +func (q *Queries) ListUnreadNotifications(ctx context.Context, limit int64) ([]Notification, error) { + rows, err := q.db.QueryContext(ctx, listUnreadNotifications, limit) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Notification{} + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.ID, + &i.SessionID, + &i.ProjectID, + &i.PRURL, + &i.Type, + &i.Title, + &i.Body, + &i.Status, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/backend/internal/storage/sqlite/migrations/0011_notifications.sql b/backend/internal/storage/sqlite/migrations/0011_notifications.sql new file mode 100644 index 0000000..9e24d95 --- /dev/null +++ b/backend/internal/storage/sqlite/migrations/0011_notifications.sql @@ -0,0 +1,35 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE notifications ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, + project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE, + pr_url TEXT NOT NULL DEFAULT '', + type TEXT NOT NULL CHECK ( + type IN ( + 'needs_input', + 'ready_to_merge', + 'pr_merged', + 'pr_closed_unmerged' + ) + ), + title TEXT NOT NULL, + body TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'unread' CHECK (status IN ('read', 'unread')), + created_at TIMESTAMP NOT NULL +); + +CREATE INDEX idx_notifications_status + ON notifications(status, created_at DESC); + +CREATE UNIQUE INDEX idx_notifications_unread_dedupe + ON notifications(session_id, type, pr_url) + WHERE status = 'unread'; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX IF EXISTS idx_notifications_unread_dedupe; +DROP INDEX IF EXISTS idx_notifications_status; +DROP TABLE IF EXISTS notifications; +-- +goose StatementEnd diff --git a/backend/internal/storage/sqlite/queries/notifications.sql b/backend/internal/storage/sqlite/queries/notifications.sql new file mode 100644 index 0000000..bd8a75b --- /dev/null +++ b/backend/internal/storage/sqlite/queries/notifications.sql @@ -0,0 +1,19 @@ +-- name: CreateNotification :one +INSERT INTO notifications ( + id, session_id, project_id, pr_url, type, title, body, status, created_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +RETURNING *; + +-- name: ListUnreadNotifications :many +SELECT * +FROM notifications +WHERE status = 'unread' +ORDER BY created_at DESC +LIMIT ?; + + +-- name: GetUnreadNotificationByDedupe :one +SELECT * +FROM notifications +WHERE session_id = ? AND type = ? AND pr_url = ? AND status = 'unread' +LIMIT 1; diff --git a/backend/internal/storage/sqlite/store/notification_store.go b/backend/internal/storage/sqlite/store/notification_store.go new file mode 100644 index 0000000..ce54293 --- /dev/null +++ b/backend/internal/storage/sqlite/store/notification_store.go @@ -0,0 +1,105 @@ +package store + +import ( + "context" + "database/sql" + "errors" + "fmt" + + moderncsqlite "modernc.org/sqlite" + sqlite3 "modernc.org/sqlite/lib" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/gen" +) + +var _ notificationsvc.Store = (*Store)(nil) + +// CreateNotification inserts one unread notification. It returns created=false +// when the unread dedupe index already has a matching row. +func (s *Store) CreateNotification(ctx context.Context, rec domain.NotificationRecord) (domain.NotificationRecord, bool, error) { + if err := rec.Validate(); err != nil { + return domain.NotificationRecord{}, false, err + } + s.writeMu.Lock() + defer s.writeMu.Unlock() + if existing, ok, err := s.getUnreadNotificationByDedupe(ctx, rec); err != nil { + return domain.NotificationRecord{}, false, err + } else if ok { + return existing, false, nil + } + row, err := s.qw.CreateNotification(ctx, gen.CreateNotificationParams{ + ID: rec.ID, + SessionID: rec.SessionID, + ProjectID: rec.ProjectID, + PRURL: rec.PRURL, + Type: rec.Type, + Title: rec.Title, + Body: rec.Body, + Status: rec.Status, + CreatedAt: rec.CreatedAt, + }) + if err != nil { + if isSQLiteUnique(err) { + if existing, ok, lookupErr := s.getUnreadNotificationByDedupe(ctx, rec); lookupErr != nil { + return domain.NotificationRecord{}, false, lookupErr + } else if ok { + return existing, false, nil + } + } + return domain.NotificationRecord{}, false, fmt.Errorf("create notification %s: %w", rec.ID, err) + } + return notificationFromGen(row), true, nil +} + +// ListUnreadNotifications returns unread notifications newest-first. +func (s *Store) ListUnreadNotifications(ctx context.Context, limit int) ([]domain.NotificationRecord, error) { + rows, err := s.qr.ListUnreadNotifications(ctx, int64(limit)) + if err != nil { + return nil, fmt.Errorf("list unread notifications: %w", err) + } + return notificationsFromGen(rows), nil +} + +func (s *Store) getUnreadNotificationByDedupe(ctx context.Context, rec domain.NotificationRecord) (domain.NotificationRecord, bool, error) { + row, err := s.qw.GetUnreadNotificationByDedupe(ctx, gen.GetUnreadNotificationByDedupeParams{ + SessionID: rec.SessionID, + Type: rec.Type, + PRURL: rec.PRURL, + }) + if errors.Is(err, sql.ErrNoRows) { + return domain.NotificationRecord{}, false, nil + } + if err != nil { + return domain.NotificationRecord{}, false, fmt.Errorf("lookup unread notification dedupe: %w", err) + } + return notificationFromGen(row), true, nil +} + +func isSQLiteUnique(err error) bool { + var sqliteErr *moderncsqlite.Error + return errors.As(err, &sqliteErr) && sqliteErr.Code() == sqlite3.SQLITE_CONSTRAINT_UNIQUE +} + +func notificationFromGen(row gen.Notification) domain.NotificationRecord { + return domain.NotificationRecord{ + ID: row.ID, + SessionID: row.SessionID, + ProjectID: row.ProjectID, + PRURL: row.PRURL, + Type: row.Type, + Title: row.Title, + Body: row.Body, + Status: row.Status, + CreatedAt: row.CreatedAt, + } +} + +func notificationsFromGen(rows []gen.Notification) []domain.NotificationRecord { + out := make([]domain.NotificationRecord, 0, len(rows)) + for _, row := range rows { + out = append(out, notificationFromGen(row)) + } + return out +} diff --git a/backend/internal/storage/sqlite/store/notification_store_test.go b/backend/internal/storage/sqlite/store/notification_store_test.go new file mode 100644 index 0000000..0b2ebf9 --- /dev/null +++ b/backend/internal/storage/sqlite/store/notification_store_test.go @@ -0,0 +1,90 @@ +package store_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func TestNotificationStore_InsertListAndDedupe(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + seedProject(t, s, "mer") + sess, err := s.CreateSession(ctx, sampleRecord("mer")) + if err != nil { + t.Fatalf("create session: %v", err) + } + now := time.Now().UTC().Truncate(time.Second) + rec := domain.NotificationRecord{ + ID: "ntf_1", + SessionID: sess.ID, + ProjectID: sess.ProjectID, + Type: domain.NotificationNeedsInput, + Title: "checkout-flow needs input", + Status: domain.NotificationUnread, + CreatedAt: now, + } + created, inserted, err := s.CreateNotification(ctx, rec) + if err != nil || !inserted { + t.Fatalf("CreateNotification inserted=%v err=%v", inserted, err) + } + if created.ID != rec.ID || created.Title != rec.Title { + t.Fatalf("created = %+v", created) + } + dup := rec + dup.ID = "ntf_2" + _, inserted, err = s.CreateNotification(ctx, dup) + if err != nil || inserted { + t.Fatalf("duplicate inserted=%v err=%v, want false nil", inserted, err) + } + rows, err := s.ListUnreadNotifications(ctx, 10) + if err != nil { + t.Fatalf("ListUnreadNotifications: %v", err) + } + if len(rows) != 1 || rows[0].ID != "ntf_1" { + t.Fatalf("rows = %+v", rows) + } +} + +func TestNotificationStore_ListUnreadNewestFirstAcrossProjects(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + seedProject(t, s, "mer") + seedProject(t, s, "ao") + mer, _ := s.CreateSession(ctx, sampleRecord("mer")) + ao, _ := s.CreateSession(ctx, sampleRecord("ao")) + base := time.Now().UTC().Truncate(time.Second) + for _, rec := range []domain.NotificationRecord{ + {ID: "old", SessionID: mer.ID, ProjectID: mer.ProjectID, Type: domain.NotificationNeedsInput, Title: "old", Status: domain.NotificationUnread, CreatedAt: base}, + {ID: "new", SessionID: mer.ID, ProjectID: mer.ProjectID, PRURL: "https://github.com/o/r/pull/1", Type: domain.NotificationReadyToMerge, Title: "new", Status: domain.NotificationUnread, CreatedAt: base.Add(time.Minute)}, + {ID: "other", SessionID: ao.ID, ProjectID: ao.ProjectID, Type: domain.NotificationNeedsInput, Title: "other", Status: domain.NotificationUnread, CreatedAt: base.Add(2 * time.Minute)}, + } { + if _, inserted, err := s.CreateNotification(ctx, rec); err != nil || !inserted { + t.Fatalf("insert %s inserted=%v err=%v", rec.ID, inserted, err) + } + } + rows, err := s.ListUnreadNotifications(ctx, 2) + if err != nil { + t.Fatalf("ListUnreadNotifications: %v", err) + } + if len(rows) != 2 || rows[0].ID != "other" || rows[1].ID != "new" { + t.Fatalf("rows = %+v", rows) + } +} + +func TestNotificationStore_CheckConstraintRejectsInvalidStatus(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + seedProject(t, s, "mer") + sess, _ := s.CreateSession(ctx, sampleRecord("mer")) + _, _, err := s.CreateNotification(ctx, domain.NotificationRecord{ + ID: "bad", SessionID: sess.ID, ProjectID: sess.ProjectID, Type: domain.NotificationNeedsInput, + Title: "bad", Status: "archived", CreatedAt: time.Now(), + }) + if !errors.Is(err, domain.ErrInvalidNotificationStatus) { + t.Fatalf("err = %v, want invalid status", err) + } +} diff --git a/backend/sqlc.yaml b/backend/sqlc.yaml index 1813a59..4baabf6 100644 --- a/backend/sqlc.yaml +++ b/backend/sqlc.yaml @@ -56,6 +56,22 @@ sql: type: "PRCheckStatus" - column: "pr_comment.resolved" go_type: "bool" + - column: "notifications.session_id" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "SessionID" + - column: "notifications.project_id" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "ProjectID" + - column: "notifications.type" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "NotificationType" + - column: "notifications.status" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "NotificationStatus" - column: "projects.id" go_type: import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" diff --git a/frontend/src/api/schema.ts b/frontend/src/api/schema.ts index 93d306a..45a3f99 100644 --- a/frontend/src/api/schema.ts +++ b/frontend/src/api/schema.ts @@ -21,6 +21,40 @@ export interface paths { patch?: never; trace?: never; }; + "/api/v1/notifications": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** List unread notifications */ + get: operations["listNotifications"]; + put?: never; + post?: never; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; + "/api/v1/notifications/stream": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** Stream created notifications */ + get: operations["streamNotifications"]; + put?: never; + post?: never; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/api/v1/orchestrators": { parameters: { query?: never; @@ -431,6 +465,9 @@ export interface components { ok: boolean; sessionId: string; }; + ListNotificationsResponse: { + notifications: components["schemas"]["NotificationResponse"][]; + }; ListProjectsResponse: { projects: components["schemas"]["ProjectSummary"][]; }; @@ -449,6 +486,27 @@ export interface components { ok: boolean; prNumber: number; }; + NotificationResponse: { + body: string; + /** Format: date-time */ + createdAt: string; + id: string; + prUrl: string; + projectId: string; + sessionId: string; + /** @enum {string} */ + status: "unread" | "read"; + target: components["schemas"]["NotificationTarget"]; + title: string; + /** @enum {string} */ + type: "needs_input" | "ready_to_merge" | "pr_merged" | "pr_closed_unmerged"; + }; + NotificationTarget: { + /** @enum {string} */ + kind: "session" | "pr"; + prUrl?: string; + sessionId: string; + }; OrchestratorResponse: { id: string; projectId: string; @@ -678,6 +736,99 @@ export interface operations { }; }; }; + listNotifications: { + parameters: { + query?: { + /** @description Notification status filter. V1 supports only unread. */ + status?: "unread"; + /** @description Maximum notifications to return. Defaults to 50; capped at 100. */ + limit?: number; + }; + header?: never; + path?: never; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description OK */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["ListNotificationsResponse"]; + }; + }; + /** @description Bad Request */ + 400: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + /** @description Internal Server Error */ + 500: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + /** @description Not Implemented */ + 501: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + }; + }; + streamNotifications: { + parameters: { + query?: { + /** @description Optional project id filter for live notifications. */ + projectId?: string; + }; + header?: never; + path?: never; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description OK */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "text/event-stream": string; + }; + }; + /** @description Internal Server Error */ + 500: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + /** @description Not Implemented */ + 501: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + }; + }; listOrchestrators: { parameters: { query?: never;