Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions backend/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/aoagents/agent-orchestrator/backend/internal/config"
"github.com/aoagents/agent-orchestrator/backend/internal/httpd"
"github.com/aoagents/agent-orchestrator/backend/internal/runfile"
notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification"
projectsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/project"
reviewsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/review"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
Expand Down Expand Up @@ -82,11 +83,15 @@ func Run() error {
// Built before the Lifecycle Manager so the LCM can use it for SCM-driven
// agent nudges (CI failure, review feedback, merge conflict).
messenger := newSessionMessenger(store, runtimeAdapter, log)
notifier := notificationsvc.New(notificationsvc.Deps{
Store: store,
Dispatcher: notificationsvc.NewDashboardDispatcher(nil),
})

// Bring up the Lifecycle Manager and the reaper first: it makes the session
// lifecycle write path live (reducer write -> store -> DB trigger ->
// change_log -> poller -> broadcaster) and gives startSession the shared LCM.
lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, log)
lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, notifier, log)
lcStack.scmDone = startSCMObserver(ctx, store, lcStack.LCM, log)

// Wire the controller-facing session service over the same store + LCM, the
Expand All @@ -104,12 +109,13 @@ func Run() error {
}

srv, err := httpd.NewWithDeps(cfg, log, termMgr, httpd.APIDeps{
Projects: projectsvc.New(store),
Sessions: sessionSvc,
Reviews: reviewsvc.NewInMemory(),
CDC: store,
Events: cdcPipe.Broadcaster,
Activity: lcStack.LCM,
Projects: projectsvc.New(store),
Sessions: sessionSvc,
Reviews: reviewsvc.NewInMemory(),
Notifications: notifier,
CDC: store,
Events: cdcPipe.Broadcaster,
Activity: lcStack.LCM,
})
if err != nil {
stop()
Expand Down
8 changes: 6 additions & 2 deletions backend/internal/daemon/lifecycle_wiring.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
)

type notificationSink interface {
Notify(context.Context, ports.NotificationIntent) error
}

// lifecycleStack owns the runtime reaper goroutine started with the lifecycle
// reducer. The reducer itself is only used for wiring observations into storage.
type lifecycleStack struct {
Expand All @@ -35,8 +39,8 @@ type lifecycleStack struct {
// reaper. The goroutine stops when ctx is cancelled; Stop waits for it to drain.
// The messenger is the per-daemon agent messenger the LCM uses to nudge agents
// in response to SCM observations (CI failure, review feedback, merge conflict).
func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runtime, messenger ports.AgentMessenger, logger *slog.Logger) *lifecycleStack {
lcm := lifecycle.New(store, messenger)
func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runtime, messenger ports.AgentMessenger, notifier notificationSink, logger *slog.Logger) *lifecycleStack {
lcm := lifecycle.New(store, messenger, lifecycle.WithNotificationSink(notifier))
rp := reaper.New(lcm, store, runtime, reaper.Config{Logger: logger})
return &lifecycleStack{LCM: lcm, reaperDone: rp.Start(ctx)}
}
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/daemon/wiring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestWiring_StartLifecycleThreadsMessengerIntoLCM(t *testing.T) {

log := slog.New(slog.NewTextHandler(io.Discard, nil))
messenger := &captureMessenger{}
stack := startLifecycle(ctx, store, zellij.New(zellij.Options{}), messenger, log)
stack := startLifecycle(ctx, store, zellij.New(zellij.Options{}), messenger, nil, log)
t.Cleanup(stack.Stop)
t.Cleanup(cancel)

Expand Down
86 changes: 86 additions & 0 deletions backend/internal/domain/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package domain

import (
"errors"
"time"
)

// NotificationType identifies a user-facing notification kind persisted for the dashboard.
type NotificationType string

const (
// NotificationNeedsInput means an agent session is waiting for user input.
NotificationNeedsInput NotificationType = "needs_input"
// NotificationReadyToMerge means a PR has no known merge blockers.
NotificationReadyToMerge NotificationType = "ready_to_merge"
// NotificationPRMerged means a tracked PR was merged.
NotificationPRMerged NotificationType = "pr_merged"
// NotificationPRClosedUnmerged means a tracked PR closed without merging.
NotificationPRClosedUnmerged NotificationType = "pr_closed_unmerged"
)

// Valid reports whether t is one of the v1 notification kinds.
func (t NotificationType) Valid() bool {
switch t {
case NotificationNeedsInput, NotificationReadyToMerge, NotificationPRMerged, NotificationPRClosedUnmerged:
return true
default:
return false
}
}

// NotificationStatus is the read state for a stored notification.
type NotificationStatus string

const (
// NotificationUnread marks a notification that has not been acknowledged.
NotificationUnread NotificationStatus = "unread"
// NotificationRead marks a notification that has been acknowledged.
NotificationRead NotificationStatus = "read"
)

// Valid reports whether s is a supported notification read state.
func (s NotificationStatus) Valid() bool {
switch s {
case NotificationUnread, NotificationRead:
return true
default:
return false
}
}

// NotificationRecord is the durable notification persistence shape.
type NotificationRecord struct {
ID string
SessionID SessionID
ProjectID ProjectID
PRURL string
Type NotificationType
Title string
Body string
Status NotificationStatus
CreatedAt time.Time
}

var (
// ErrInvalidNotificationType reports an unknown notification type.
ErrInvalidNotificationType = errors.New("invalid notification type")
// ErrInvalidNotificationStatus reports an unknown notification status.
ErrInvalidNotificationStatus = errors.New("invalid notification status")
// ErrInvalidNotificationRecord reports a missing required notification field.
ErrInvalidNotificationRecord = errors.New("invalid notification record")
)

// Validate checks the required fields and enum values for a stored notification.
func (r NotificationRecord) Validate() error {
if r.SessionID == "" || r.ProjectID == "" || r.Title == "" || r.CreatedAt.IsZero() {
return ErrInvalidNotificationRecord
}
if !r.Type.Valid() {
return ErrInvalidNotificationType
}
if !r.Status.Valid() {
return ErrInvalidNotificationStatus
}
return nil
}
36 changes: 20 additions & 16 deletions backend/internal/httpd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,26 @@ import (

// APIDeps bundles every service the API layer's controllers depend on.
type APIDeps struct {
Projects projectsvc.Manager
Sessions controllers.SessionService
Activity controllers.ActivityRecorder
PRs prsvc.ActionManager
Reviews reviewsvc.Manager
CDC cdc.Source
Events cdcSubscriber
Projects projectsvc.Manager
Sessions controllers.SessionService
Activity controllers.ActivityRecorder
PRs prsvc.ActionManager
Reviews reviewsvc.Manager
Notifications controllers.NotificationService
CDC cdc.Source
Events cdcSubscriber
}

// API owns one controller per resource and is the single Register call the
// router invokes to mount the /api/v1 surface.
type API struct {
cfg config.Config
projects *controllers.ProjectsController
sessions *controllers.SessionsController
prs *controllers.PRsController
reviews *controllers.ReviewsController
events *EventsController
cfg config.Config
projects *controllers.ProjectsController
sessions *controllers.SessionsController
prs *controllers.PRsController
reviews *controllers.ReviewsController
notifications *controllers.NotificationsController
events *EventsController
}

// NewAPI constructs the API surface from its dependencies. cfg carries the
Expand All @@ -51,9 +53,10 @@ func NewAPI(cfg config.Config, deps APIDeps) *API {
Svc: deps.Sessions,
Activity: deps.Activity,
},
prs: &controllers.PRsController{Svc: deps.PRs},
reviews: &controllers.ReviewsController{Svc: deps.Reviews},
events: &EventsController{Source: deps.CDC, Live: deps.Events},
prs: &controllers.PRsController{Svc: deps.PRs},
reviews: &controllers.ReviewsController{Svc: deps.Reviews},
notifications: &controllers.NotificationsController{Svc: deps.Notifications},
events: &EventsController{Source: deps.CDC, Live: deps.Events},
}
}

Expand All @@ -75,6 +78,7 @@ func (a *API) Register(root chi.Router) {
a.sessions.Register(r)
a.prs.Register(r)
a.reviews.Register(r)
a.notifications.Register(r)
// Sibling REST controllers plug in here.
})
// Long-lived streams intentionally bypass the REST timeout middleware.
Expand Down
124 changes: 124 additions & 0 deletions backend/internal/httpd/apispec/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,61 @@ paths:
summary: Stream CDC events with durable replay
tags:
- events
/api/v1/notifications:
get:
operationId: listNotifications
parameters:
- description: Notification status filter. V1 supports only unread.
in: query
name: status
schema:
description: Notification status filter. V1 supports only unread.
enum:
- unread
type: string
- description: Optional project id filter.
in: query
name: projectId
schema:
description: Optional project id filter.
type: string
- description: Maximum notifications to return. Defaults to 50; capped at 100.
in: query
name: limit
schema:
description: Maximum notifications to return. Defaults to 50; capped at
100.
maximum: 100
minimum: 1
type: integer
responses:
"200":
content:
application/json:
schema:
$ref: '#/components/schemas/ListNotificationsResponse'
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/APIError'
description: Bad Request
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/APIError'
description: Internal Server Error
"501":
content:
application/json:
schema:
$ref: '#/components/schemas/APIError'
description: Not Implemented
summary: List unread notifications
tags:
- notifications
/api/v1/orchestrators:
get:
operationId: listOrchestrators
Expand Down Expand Up @@ -1186,6 +1241,15 @@ components:
- ok
- sessionId
type: object
ListNotificationsResponse:
properties:
notifications:
items:
$ref: '#/components/schemas/NotificationResponse'
type: array
required:
- notifications
type: object
ListProjectsResponse:
properties:
projects:
Expand Down Expand Up @@ -1238,6 +1302,64 @@ components:
- prNumber
- method
type: object
NotificationResponse:
properties:
body:
type: string
createdAt:
format: date-time
type: string
id:
type: string
prUrl:
type: string
projectId:
type: string
sessionId:
type: string
status:
enum:
- unread
- read
type: string
target:
$ref: '#/components/schemas/NotificationTarget'
title:
type: string
type:
enum:
- needs_input
- ready_to_merge
- pr_merged
- pr_closed_unmerged
type: string
required:
- id
- sessionId
- projectId
- prUrl
- type
- title
- body
- status
- createdAt
- target
type: object
NotificationTarget:
properties:
kind:
enum:
- session
- pr
type: string
prUrl:
type: string
sessionId:
type: string
required:
- kind
- sessionId
type: object
OrchestratorResponse:
properties:
id:
Expand Down Expand Up @@ -1691,5 +1813,7 @@ tags:
name: prs
- description: Code-review runs and findings
name: reviews
- description: Durable dashboard notifications
name: notifications
- description: Server-sent CDC event stream with durable replay
name: events
Loading
Loading