Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions internal/core/tools/correlate/argo_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Package correlate joins the read-only change signals cloudy can already
// observe (Kubernetes + Docker rollouts) with one external GitOps source —
// Argo CD sync history — into a single newest-first evidence chain, and names
// the most recent state-altering event as the candidate cause behind a
// symptom. Nothing here mutates cluster, host, or Argo state: every source is
// built from list/inspect/get reads, in line with cloudy's read-only contract.
package correlate

import (
"context"
"time"

"github.com/rlaope/cloudy/internal/core/tools"
"github.com/rlaope/cloudy/internal/core/tools/change"
"github.com/rlaope/cloudy/internal/core/tools/gitops"
)

// argoHistory is the slice of *gitops.ArgoClient that argoSource depends on.
// Declaring it locally keeps the change package decoupled from gitops and
// makes the source mockable in tests.
type argoHistory interface {
AppHistory(ctx context.Context, app string) ([]gitops.ArgoHistoryEntry, error)
}

// argoSource adapts an Argo CD endpoint to change.ChangeSource. The query's
// Workload is treated as an Argo Application name; Context selects the
// configured endpoint (first/default when empty).
type argoSource struct {
clients map[string]argoHistory
}

// newArgoSource builds an argoSource over the configured Argo clients. It
// returns nil when no client is wired so callers can omit the source.
func newArgoSource(clients map[string]*gitops.ArgoClient) *argoSource {
if len(clients) == 0 {
return nil
}
m := make(map[string]argoHistory, len(clients))
for name, c := range clients {
m[name] = c
}
return &argoSource{clients: m}
}

func (s *argoSource) Name() string { return "argo" }

// RecentChanges fetches q.Workload's sync history from the selected Argo
// endpoint and converts each entry into a "sync" ChangeEvent, applying the
// q.Since window. An empty q.Context resolves to the single configured
// endpoint (or errors when ambiguous).
func (s *argoSource) RecentChanges(ctx context.Context, q change.ChangeQuery) ([]change.ChangeEvent, error) {
client, err := tools.PickEndpoint(s.clients, q.Context, "correlate", "argo cd endpoint")
if err != nil {
return nil, err
}
entries, err := client.AppHistory(ctx, q.Workload)
if err != nil {
return nil, err
}
cutoff := time.Time{}
if q.Since > 0 {
cutoff = time.Now().Add(-q.Since)
}
return historyToEvents(q.Workload, entries, cutoff), nil
}

// historyToEvents converts Argo sync history into ChangeEvents. Entries whose
// DeployedAt does not parse as RFC3339 are skipped (they carry no usable time
// to align against other signals); entries older than cutoff are dropped when
// cutoff is non-zero. Output preserves the input order (Argo history is
// already newest-first); MergeSorted re-orders across sources.
func historyToEvents(app string, entries []gitops.ArgoHistoryEntry, cutoff time.Time) []change.ChangeEvent {
var out []change.ChangeEvent
for _, e := range entries {
t, err := time.Parse(time.RFC3339, e.DeployedAt)
if err != nil {
continue
}
if !cutoff.IsZero() && t.Before(cutoff) {
continue
}
summary := "argo sync"
if e.Source != "" {
summary = "argo sync from " + e.Source
}
out = append(out, change.ChangeEvent{
Time: t,
Kind: "sync",
Target: app,
Summary: summary,
After: e.Revision,
Source: "argo",
})
}
return out
}
134 changes: 134 additions & 0 deletions internal/core/tools/correlate/argo_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package correlate

import (
"context"
"errors"
"testing"
"time"

"github.com/rlaope/cloudy/internal/core/tools/change"
"github.com/rlaope/cloudy/internal/core/tools/gitops"
)

func TestHistoryToEvents(t *testing.T) {
t.Run("parses timestamp and maps fields", func(t *testing.T) {
entries := []gitops.ArgoHistoryEntry{
{Revision: "abc123", DeployedAt: "2026-05-28T12:00:00Z", Source: "git@repo"},
}
got := historyToEvents("app", entries, time.Time{})
if len(got) != 1 {
t.Fatalf("len = %d, want 1", len(got))
}
e := got[0]
if !e.Time.Equal(time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC)) {
t.Errorf("Time = %v, want 2026-05-28T12:00:00Z", e.Time)
}
if e.Kind != "sync" {
t.Errorf("Kind = %q, want sync", e.Kind)
}
if e.Target != "app" {
t.Errorf("Target = %q, want app", e.Target)
}
if e.After != "abc123" {
t.Errorf("After = %q, want abc123", e.After)
}
if e.Source != "argo" {
t.Errorf("Source = %q, want argo", e.Source)
}
if e.Summary != "argo sync from git@repo" {
t.Errorf("Summary = %q, want 'argo sync from git@repo'", e.Summary)
}
})

t.Run("drops unparseable timestamps", func(t *testing.T) {
entries := []gitops.ArgoHistoryEntry{
{Revision: "good", DeployedAt: "2026-05-28T12:00:00Z"},
{Revision: "bad", DeployedAt: "not-a-time"},
}
got := historyToEvents("app", entries, time.Time{})
if len(got) != 1 || got[0].After != "good" {
t.Fatalf("expected only the parseable entry, got %+v", got)
}
})

t.Run("applies since cutoff", func(t *testing.T) {
entries := []gitops.ArgoHistoryEntry{
{Revision: "recent", DeployedAt: "2026-05-28T12:00:00Z"},
{Revision: "old", DeployedAt: "2026-05-20T12:00:00Z"},
}
cutoff := time.Date(2026, 5, 25, 0, 0, 0, 0, time.UTC)
got := historyToEvents("app", entries, cutoff)
if len(got) != 1 || got[0].After != "recent" {
t.Fatalf("expected only the entry after the cutoff, got %+v", got)
}
})

t.Run("zero cutoff keeps everything", func(t *testing.T) {
entries := []gitops.ArgoHistoryEntry{
{Revision: "a", DeployedAt: "2020-01-01T00:00:00Z"},
{Revision: "b", DeployedAt: "2026-05-28T12:00:00Z"},
}
if got := historyToEvents("app", entries, time.Time{}); len(got) != 2 {
t.Fatalf("zero cutoff should keep all entries, got %d", len(got))
}
})
}

// mockArgoHistory is an argoHistory returning canned entries or an error.
type mockArgoHistory struct {
entries []gitops.ArgoHistoryEntry
err error
gotApp string
}

func (m *mockArgoHistory) AppHistory(_ context.Context, app string) ([]gitops.ArgoHistoryEntry, error) {
m.gotApp = app
return m.entries, m.err
}

func TestArgoSource_RecentChanges(t *testing.T) {
t.Run("converts history via the selected endpoint", func(t *testing.T) {
mock := &mockArgoHistory{entries: []gitops.ArgoHistoryEntry{
{Revision: "rev1", DeployedAt: "2026-05-28T12:00:00Z"},
}}
src := &argoSource{clients: map[string]argoHistory{"prod": mock}}
got, err := src.RecentChanges(context.Background(), change.ChangeQuery{Workload: "myapp"})
if err != nil {
t.Fatalf("RecentChanges: %v", err)
}
if mock.gotApp != "myapp" {
t.Errorf("AppHistory called with %q, want myapp", mock.gotApp)
}
if len(got) != 1 || got[0].After != "rev1" || got[0].Source != "argo" {
t.Fatalf("unexpected events: %+v", got)
}
})

t.Run("propagates client error", func(t *testing.T) {
src := &argoSource{clients: map[string]argoHistory{"prod": &mockArgoHistory{err: errors.New("boom")}}}
if _, err := src.RecentChanges(context.Background(), change.ChangeQuery{Workload: "app"}); err == nil {
t.Fatal("expected error from client")
}
})

t.Run("ambiguous endpoint errors when context empty", func(t *testing.T) {
src := &argoSource{clients: map[string]argoHistory{
"a": &mockArgoHistory{}, "b": &mockArgoHistory{},
}}
if _, err := src.RecentChanges(context.Background(), change.ChangeQuery{Workload: "app"}); err == nil {
t.Fatal("expected error selecting among multiple endpoints with empty context")
}
})
}

func TestNewArgoSource(t *testing.T) {
if newArgoSource(nil) != nil {
t.Error("newArgoSource(nil) should be nil")
}
if newArgoSource(map[string]*gitops.ArgoClient{}) != nil {
t.Error("newArgoSource(empty) should be nil")
}
if newArgoSource(map[string]*gitops.ArgoClient{"p": {}}) == nil {
t.Error("newArgoSource with a client should be non-nil")
}
}
31 changes: 31 additions & 0 deletions internal/core/tools/correlate/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package correlate

import (
dockerclient "github.com/rlaope/cloudy/internal/clients/docker"
k8sclient "github.com/rlaope/cloudy/internal/clients/k8s"
"github.com/rlaope/cloudy/internal/core/tools"
"github.com/rlaope/cloudy/internal/core/tools/change"
"github.com/rlaope/cloudy/internal/core/tools/gitops"
)

// RegisterAll adds the correlate.workload tool to reg, bound to whichever
// signal sources are available: a k8s change source when k8sHub is non-nil, a
// docker change source when dockerHub is non-nil, and an Argo CD sync source
// when at least one Argo client is wired. With no source available this is a
// no-op — the wiring layer marks the "correlate" group skipped instead.
func RegisterAll(reg *tools.Registry, k8sHub *k8sclient.Hub, dockerHub *dockerclient.Hub, argo map[string]*gitops.ArgoClient) {
var sources []change.ChangeSource
if k8sHub != nil {
sources = append(sources, change.NewK8sSource(k8sHub))
}
if dockerHub != nil {
sources = append(sources, change.NewDockerSource(dockerHub))
}
if src := newArgoSource(argo); src != nil {
sources = append(sources, src)
}
if len(sources) == 0 {
return
}
reg.MustRegister(newCorrelateTool(sources...))
}
Loading
Loading