From d21266c44a1b624b2d60f2008c6bebddebe09470 Mon Sep 17 00:00:00 2001 From: rlaope Date: Thu, 28 May 2026 16:31:24 +0900 Subject: [PATCH] feat(tools): add cross-signal correlation with ArgoCD timeline (Phase 4/4) Add a read-only correlate.workload tool that merges the change timeline (k8s + docker) with ArgoCD sync history into one newest-first evidence chain and surfaces a candidate-cause line (the most recent non-event change/sync). ArgoCD is integrated as a change.ChangeSource via a new exported ArgoClient.AppHistory wrapper over the existing read-only v1 API. v1 correlates change + k8s events + Argo sync; Prometheus-metric and log/trace correlation are deferred to v2. Read-only, RiskLow. Phase 4 of #103. Signed-off-by: rlaope --- internal/core/tools/correlate/argo_source.go | 96 +++++++++ .../core/tools/correlate/argo_source_test.go | 134 +++++++++++++ internal/core/tools/correlate/register.go | 31 +++ internal/core/tools/correlate/tool.go | 186 ++++++++++++++++++ internal/core/tools/correlate/tool_test.go | 174 ++++++++++++++++ internal/core/tools/gitops/argocd.go | 13 ++ internal/wiring/skills_test.go | 2 + internal/wiring/tools.go | 11 ++ 8 files changed, 647 insertions(+) create mode 100644 internal/core/tools/correlate/argo_source.go create mode 100644 internal/core/tools/correlate/argo_source_test.go create mode 100644 internal/core/tools/correlate/register.go create mode 100644 internal/core/tools/correlate/tool.go create mode 100644 internal/core/tools/correlate/tool_test.go diff --git a/internal/core/tools/correlate/argo_source.go b/internal/core/tools/correlate/argo_source.go new file mode 100644 index 0000000..b0945c9 --- /dev/null +++ b/internal/core/tools/correlate/argo_source.go @@ -0,0 +1,96 @@ +// Package correlate joins the read-only change signals cloudy can already +// observe (Kubernetes + Docker rollouts) with one external GitOps source — +// Argo CD sync history — into a single newest-first evidence chain, and names +// the most recent state-altering event as the candidate cause behind a +// symptom. Nothing here mutates cluster, host, or Argo state: every source is +// built from list/inspect/get reads, in line with cloudy's read-only contract. +package correlate + +import ( + "context" + "time" + + "github.com/rlaope/cloudy/internal/core/tools" + "github.com/rlaope/cloudy/internal/core/tools/change" + "github.com/rlaope/cloudy/internal/core/tools/gitops" +) + +// argoHistory is the slice of *gitops.ArgoClient that argoSource depends on. +// Declaring it locally keeps the change package decoupled from gitops and +// makes the source mockable in tests. +type argoHistory interface { + AppHistory(ctx context.Context, app string) ([]gitops.ArgoHistoryEntry, error) +} + +// argoSource adapts an Argo CD endpoint to change.ChangeSource. The query's +// Workload is treated as an Argo Application name; Context selects the +// configured endpoint (first/default when empty). +type argoSource struct { + clients map[string]argoHistory +} + +// newArgoSource builds an argoSource over the configured Argo clients. It +// returns nil when no client is wired so callers can omit the source. +func newArgoSource(clients map[string]*gitops.ArgoClient) *argoSource { + if len(clients) == 0 { + return nil + } + m := make(map[string]argoHistory, len(clients)) + for name, c := range clients { + m[name] = c + } + return &argoSource{clients: m} +} + +func (s *argoSource) Name() string { return "argo" } + +// RecentChanges fetches q.Workload's sync history from the selected Argo +// endpoint and converts each entry into a "sync" ChangeEvent, applying the +// q.Since window. An empty q.Context resolves to the single configured +// endpoint (or errors when ambiguous). +func (s *argoSource) RecentChanges(ctx context.Context, q change.ChangeQuery) ([]change.ChangeEvent, error) { + client, err := tools.PickEndpoint(s.clients, q.Context, "correlate", "argo cd endpoint") + if err != nil { + return nil, err + } + entries, err := client.AppHistory(ctx, q.Workload) + if err != nil { + return nil, err + } + cutoff := time.Time{} + if q.Since > 0 { + cutoff = time.Now().Add(-q.Since) + } + return historyToEvents(q.Workload, entries, cutoff), nil +} + +// historyToEvents converts Argo sync history into ChangeEvents. Entries whose +// DeployedAt does not parse as RFC3339 are skipped (they carry no usable time +// to align against other signals); entries older than cutoff are dropped when +// cutoff is non-zero. Output preserves the input order (Argo history is +// already newest-first); MergeSorted re-orders across sources. +func historyToEvents(app string, entries []gitops.ArgoHistoryEntry, cutoff time.Time) []change.ChangeEvent { + var out []change.ChangeEvent + for _, e := range entries { + t, err := time.Parse(time.RFC3339, e.DeployedAt) + if err != nil { + continue + } + if !cutoff.IsZero() && t.Before(cutoff) { + continue + } + summary := "argo sync" + if e.Source != "" { + summary = "argo sync from " + e.Source + } + out = append(out, change.ChangeEvent{ + Time: t, + Kind: "sync", + Target: app, + Summary: summary, + After: e.Revision, + Source: "argo", + }) + } + return out +} diff --git a/internal/core/tools/correlate/argo_source_test.go b/internal/core/tools/correlate/argo_source_test.go new file mode 100644 index 0000000..0a0ed34 --- /dev/null +++ b/internal/core/tools/correlate/argo_source_test.go @@ -0,0 +1,134 @@ +package correlate + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/rlaope/cloudy/internal/core/tools/change" + "github.com/rlaope/cloudy/internal/core/tools/gitops" +) + +func TestHistoryToEvents(t *testing.T) { + t.Run("parses timestamp and maps fields", func(t *testing.T) { + entries := []gitops.ArgoHistoryEntry{ + {Revision: "abc123", DeployedAt: "2026-05-28T12:00:00Z", Source: "git@repo"}, + } + got := historyToEvents("app", entries, time.Time{}) + if len(got) != 1 { + t.Fatalf("len = %d, want 1", len(got)) + } + e := got[0] + if !e.Time.Equal(time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC)) { + t.Errorf("Time = %v, want 2026-05-28T12:00:00Z", e.Time) + } + if e.Kind != "sync" { + t.Errorf("Kind = %q, want sync", e.Kind) + } + if e.Target != "app" { + t.Errorf("Target = %q, want app", e.Target) + } + if e.After != "abc123" { + t.Errorf("After = %q, want abc123", e.After) + } + if e.Source != "argo" { + t.Errorf("Source = %q, want argo", e.Source) + } + if e.Summary != "argo sync from git@repo" { + t.Errorf("Summary = %q, want 'argo sync from git@repo'", e.Summary) + } + }) + + t.Run("drops unparseable timestamps", func(t *testing.T) { + entries := []gitops.ArgoHistoryEntry{ + {Revision: "good", DeployedAt: "2026-05-28T12:00:00Z"}, + {Revision: "bad", DeployedAt: "not-a-time"}, + } + got := historyToEvents("app", entries, time.Time{}) + if len(got) != 1 || got[0].After != "good" { + t.Fatalf("expected only the parseable entry, got %+v", got) + } + }) + + t.Run("applies since cutoff", func(t *testing.T) { + entries := []gitops.ArgoHistoryEntry{ + {Revision: "recent", DeployedAt: "2026-05-28T12:00:00Z"}, + {Revision: "old", DeployedAt: "2026-05-20T12:00:00Z"}, + } + cutoff := time.Date(2026, 5, 25, 0, 0, 0, 0, time.UTC) + got := historyToEvents("app", entries, cutoff) + if len(got) != 1 || got[0].After != "recent" { + t.Fatalf("expected only the entry after the cutoff, got %+v", got) + } + }) + + t.Run("zero cutoff keeps everything", func(t *testing.T) { + entries := []gitops.ArgoHistoryEntry{ + {Revision: "a", DeployedAt: "2020-01-01T00:00:00Z"}, + {Revision: "b", DeployedAt: "2026-05-28T12:00:00Z"}, + } + if got := historyToEvents("app", entries, time.Time{}); len(got) != 2 { + t.Fatalf("zero cutoff should keep all entries, got %d", len(got)) + } + }) +} + +// mockArgoHistory is an argoHistory returning canned entries or an error. +type mockArgoHistory struct { + entries []gitops.ArgoHistoryEntry + err error + gotApp string +} + +func (m *mockArgoHistory) AppHistory(_ context.Context, app string) ([]gitops.ArgoHistoryEntry, error) { + m.gotApp = app + return m.entries, m.err +} + +func TestArgoSource_RecentChanges(t *testing.T) { + t.Run("converts history via the selected endpoint", func(t *testing.T) { + mock := &mockArgoHistory{entries: []gitops.ArgoHistoryEntry{ + {Revision: "rev1", DeployedAt: "2026-05-28T12:00:00Z"}, + }} + src := &argoSource{clients: map[string]argoHistory{"prod": mock}} + got, err := src.RecentChanges(context.Background(), change.ChangeQuery{Workload: "myapp"}) + if err != nil { + t.Fatalf("RecentChanges: %v", err) + } + if mock.gotApp != "myapp" { + t.Errorf("AppHistory called with %q, want myapp", mock.gotApp) + } + if len(got) != 1 || got[0].After != "rev1" || got[0].Source != "argo" { + t.Fatalf("unexpected events: %+v", got) + } + }) + + t.Run("propagates client error", func(t *testing.T) { + src := &argoSource{clients: map[string]argoHistory{"prod": &mockArgoHistory{err: errors.New("boom")}}} + if _, err := src.RecentChanges(context.Background(), change.ChangeQuery{Workload: "app"}); err == nil { + t.Fatal("expected error from client") + } + }) + + t.Run("ambiguous endpoint errors when context empty", func(t *testing.T) { + src := &argoSource{clients: map[string]argoHistory{ + "a": &mockArgoHistory{}, "b": &mockArgoHistory{}, + }} + if _, err := src.RecentChanges(context.Background(), change.ChangeQuery{Workload: "app"}); err == nil { + t.Fatal("expected error selecting among multiple endpoints with empty context") + } + }) +} + +func TestNewArgoSource(t *testing.T) { + if newArgoSource(nil) != nil { + t.Error("newArgoSource(nil) should be nil") + } + if newArgoSource(map[string]*gitops.ArgoClient{}) != nil { + t.Error("newArgoSource(empty) should be nil") + } + if newArgoSource(map[string]*gitops.ArgoClient{"p": {}}) == nil { + t.Error("newArgoSource with a client should be non-nil") + } +} diff --git a/internal/core/tools/correlate/register.go b/internal/core/tools/correlate/register.go new file mode 100644 index 0000000..a65da0d --- /dev/null +++ b/internal/core/tools/correlate/register.go @@ -0,0 +1,31 @@ +package correlate + +import ( + dockerclient "github.com/rlaope/cloudy/internal/clients/docker" + k8sclient "github.com/rlaope/cloudy/internal/clients/k8s" + "github.com/rlaope/cloudy/internal/core/tools" + "github.com/rlaope/cloudy/internal/core/tools/change" + "github.com/rlaope/cloudy/internal/core/tools/gitops" +) + +// RegisterAll adds the correlate.workload tool to reg, bound to whichever +// signal sources are available: a k8s change source when k8sHub is non-nil, a +// docker change source when dockerHub is non-nil, and an Argo CD sync source +// when at least one Argo client is wired. With no source available this is a +// no-op — the wiring layer marks the "correlate" group skipped instead. +func RegisterAll(reg *tools.Registry, k8sHub *k8sclient.Hub, dockerHub *dockerclient.Hub, argo map[string]*gitops.ArgoClient) { + var sources []change.ChangeSource + if k8sHub != nil { + sources = append(sources, change.NewK8sSource(k8sHub)) + } + if dockerHub != nil { + sources = append(sources, change.NewDockerSource(dockerHub)) + } + if src := newArgoSource(argo); src != nil { + sources = append(sources, src) + } + if len(sources) == 0 { + return + } + reg.MustRegister(newCorrelateTool(sources...)) +} diff --git a/internal/core/tools/correlate/tool.go b/internal/core/tools/correlate/tool.go new file mode 100644 index 0000000..ddd88f1 --- /dev/null +++ b/internal/core/tools/correlate/tool.go @@ -0,0 +1,186 @@ +package correlate + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/rlaope/cloudy/internal/core/tools" + "github.com/rlaope/cloudy/internal/core/tools/change" +) + +// defaultSince is used when the caller omits `since` or supplies a value that +// time.ParseDuration cannot parse. +const defaultSince = time.Hour + +// defaultLimit caps the merged timeline when the caller omits `limit`. +const defaultLimit = 50 + +// causeKinds is the set of state-altering change kinds that can plausibly be +// the cause behind a symptom. "event" is excluded: a Kubernetes event is a +// report of a condition, not the change that produced it. +var causeKinds = map[string]bool{ + "image": true, + "rollout": true, + "scale": true, + "sync": true, + "container_restart": true, + "container_create": true, + "image_pull": true, +} + +type correlateArgs struct { + Workload string `json:"workload"` + Namespace string `json:"namespace"` + Context string `json:"context"` + Since string `json:"since"` + Limit int `json:"limit"` +} + +// correlateTool joins the available change sources (k8s, docker, argo) into one +// newest-first evidence timeline for a workload and names the most recent +// state-altering event as the likeliest correlate of a current symptom. It is +// hand-written rather than built via Spec[Args] so it can advertise RiskLow +// through the RiskRated interface. +type correlateTool struct { + sources []change.ChangeSource +} + +// newCorrelateTool returns the correlate.workload tool bound to the supplied +// sources. At least one source is expected. +func newCorrelateTool(sources ...change.ChangeSource) tools.Tool { + return &correlateTool{sources: sources} +} + +func (t *correlateTool) Name() string { return "correlate.workload" } + +func (t *correlateTool) Description() string { + return "Correlate recent changes for a workload across signals — Kubernetes/Docker change history and Argo CD sync history — into one newest-first evidence timeline, and name the most recent state-altering event as the likeliest cause of a current symptom. Read-only." +} + +func (t *correlateTool) Schema() json.RawMessage { + str := func(desc string) map[string]any { return map[string]any{"type": "string", "description": desc} } + s := map[string]any{ + "type": "object", + "properties": map[string]any{ + "workload": str("Workload to correlate (deployment/statefulset/daemonset, container/compose service, or Argo CD application name). Required."), + "namespace": str("Kubernetes namespace to scope the search; ignored by the Docker and Argo sources."), + "context": str("kubeconfig context, Docker host, or Argo CD endpoint name to query; empty = each backend's default."), + "since": str("How far back to look, as a Go duration (e.g. \"1h\", \"90m\"); default \"1h\"."), + "limit": map[string]any{"type": "integer", "description": "Maximum number of timeline events to return; default 50."}, + }, + "required": []string{"workload"}, + } + b, err := json.Marshal(s) + if err != nil { + panic("correlate: schema marshal: " + err.Error()) + } + return b +} + +// Risk implements tools.RiskRated. correlate.workload only fans out to +// list/inspect reads already classified RiskLow elsewhere. +func (t *correlateTool) Risk() tools.RiskLevel { return tools.RiskLow } + +func (t *correlateTool) Run(ctx context.Context, raw json.RawMessage) (tools.Observation, error) { + var a correlateArgs + if len(raw) > 0 { + if err := json.Unmarshal(raw, &a); err != nil { + return tools.Observation{}, fmt.Errorf("correlate.workload: parse args: %w", err) + } + } + if a.Workload == "" { + return tools.Observation{Text: "correlate.workload: workload is required"}, nil + } + + since := defaultSince + if a.Since != "" { + if d, err := time.ParseDuration(a.Since); err == nil { + since = d + } + } + limit := defaultLimit + if a.Limit > 0 { + limit = a.Limit + } + + if len(t.sources) == 0 { + return tools.Observation{Text: "correlate.workload: no change sources available"}, nil + } + + q := change.ChangeQuery{ + Workload: a.Workload, + Namespace: a.Namespace, + Context: a.Context, + Since: since, + Limit: limit, + } + + var groups [][]change.ChangeEvent + var failures []string + for _, src := range t.sources { + events, err := src.RecentChanges(ctx, q) + if err != nil { + failures = append(failures, fmt.Sprintf("%s: %v", src.Name(), err)) + continue + } + groups = append(groups, events) + } + + // Only hard-error when every source failed; partial success still returns. + if len(groups) == 0 { + return tools.Observation{}, fmt.Errorf("correlate.workload: all sources failed: %s", strings.Join(failures, "; ")) + } + + merged := change.MergeSorted(limit, groups...) + return tools.Observation{ + Text: renderCorrelation(a.Workload, since, merged, failures), + Raw: merged, + }, nil +} + +// candidateCause returns the newest event whose Kind is state-altering (in +// causeKinds), i.e. the likeliest correlate of a current symptom. events must +// already be newest-first (MergeSorted output). Returns nil when none qualify. +func candidateCause(events []change.ChangeEvent) *change.ChangeEvent { + for i := range events { + if causeKinds[events[i].Kind] { + return &events[i] + } + } + return nil +} + +// renderCorrelation formats the evidence timeline newest-first followed by the +// candidate-cause line. Each timeline line is +// "RFC3339 | source | kind | target | summary | before→after"; the before→after +// segment is omitted when both are empty. Per-source failures are noted so a +// partial result is still actionable. +func renderCorrelation(workload string, since time.Duration, events []change.ChangeEvent, failures []string) string { + var b strings.Builder + fmt.Fprintf(&b, "%d event(s) for %q in the last %s (newest first)\n", len(events), workload, since) + for _, e := range events { + fmt.Fprintf(&b, "%s | %s | %s | %s | %s", + e.Time.UTC().Format(time.RFC3339), e.Source, e.Kind, e.Target, e.Summary) + if e.Before != "" || e.After != "" { + fmt.Fprintf(&b, " | %s→%s", e.Before, e.After) + } + b.WriteByte('\n') + } + if cause := candidateCause(events); cause != nil { + fmt.Fprintf(&b, "candidate cause: %s %s on %s @ %s", + cause.Source, cause.Kind, cause.Target, cause.Time.UTC().Format(time.RFC3339)) + if cause.Before != "" || cause.After != "" { + fmt.Fprintf(&b, " (%s→%s)", cause.Before, cause.After) + } + b.WriteByte('\n') + } else { + b.WriteString("candidate cause: none — no state-altering change in the window\n") + } + if len(failures) > 0 { + fmt.Fprintf(&b, "note: %d source(s) failed: %s\n", len(failures), strings.Join(failures, "; ")) + } + return strings.TrimRight(b.String(), "\n") +} diff --git a/internal/core/tools/correlate/tool_test.go b/internal/core/tools/correlate/tool_test.go new file mode 100644 index 0000000..8c719cb --- /dev/null +++ b/internal/core/tools/correlate/tool_test.go @@ -0,0 +1,174 @@ +package correlate + +import ( + "context" + "encoding/json" + "errors" + "strings" + "testing" + "time" + + "github.com/rlaope/cloudy/internal/core/tools" + "github.com/rlaope/cloudy/internal/core/tools/change" +) + +// fakeSource is a change.ChangeSource returning canned events or an error. +type fakeSource struct { + name string + events []change.ChangeEvent + err error +} + +func (f fakeSource) Name() string { return f.name } + +func (f fakeSource) RecentChanges(_ context.Context, _ change.ChangeQuery) ([]change.ChangeEvent, error) { + return f.events, f.err +} + +func runCorrelate(t *testing.T, tool tools.Tool, args map[string]any) (tools.Observation, error) { + t.Helper() + raw, err := json.Marshal(args) + if err != nil { + t.Fatalf("marshal args: %v", err) + } + return tool.Run(context.Background(), json.RawMessage(raw)) +} + +// TestCorrelate_MergesNewestFirst: events from several sources are merged into +// one newest-first timeline. +func TestCorrelate_MergesNewestFirst(t *testing.T) { + base := time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC) + k8s := fakeSource{name: "k8s", events: []change.ChangeEvent{ + {Time: base.Add(-2 * time.Hour), Kind: "rollout", Target: "app", Source: "k8s"}, + }} + docker := fakeSource{name: "docker", events: []change.ChangeEvent{ + {Time: base.Add(-1 * time.Hour), Kind: "container_restart", Target: "app", Source: "docker"}, + }} + argo := fakeSource{name: "argo", events: []change.ChangeEvent{ + {Time: base, Kind: "sync", Target: "app", After: "abc", Source: "argo"}, + }} + tool := newCorrelateTool(k8s, docker, argo) + obs, err := runCorrelate(t, tool, map[string]any{"workload": "app", "since": "24h"}) + if err != nil { + t.Fatalf("Run: %v", err) + } + lines := strings.Split(strings.TrimSpace(obs.Text), "\n") + // Line 0 is the header; the first event line should be the argo sync (newest). + if len(lines) < 2 || !strings.Contains(lines[1], "argo") || !strings.Contains(lines[1], "sync") { + t.Errorf("expected newest (argo/sync) first, got:\n%s", obs.Text) + } +} + +// TestCorrelate_CandidateCauseSkipsEvent: the candidate cause is the newest +// state-altering event, skipping "event" kinds even when they are newer. +func TestCorrelate_CandidateCauseSkipsEvent(t *testing.T) { + base := time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC) + src := fakeSource{name: "k8s", events: []change.ChangeEvent{ + {Time: base, Kind: "event", Target: "app", Summary: "BackOff", Source: "k8s"}, + {Time: base.Add(-1 * time.Hour), Kind: "image", Target: "app", Before: "v1", After: "v2", Source: "k8s"}, + {Time: base.Add(-2 * time.Hour), Kind: "scale", Target: "app", Source: "k8s"}, + }} + tool := newCorrelateTool(src) + obs, err := runCorrelate(t, tool, map[string]any{"workload": "app", "since": "24h"}) + if err != nil { + t.Fatalf("Run: %v", err) + } + if !strings.Contains(obs.Text, "candidate cause: k8s image") { + t.Errorf("expected candidate cause to be the image change (skipping the newer event), got:\n%s", obs.Text) + } + if strings.Contains(obs.Text, "candidate cause: k8s event") { + t.Errorf("candidate cause must not be an 'event' kind, got:\n%s", obs.Text) + } +} + +// TestCorrelate_CandidateCausePicksSync: an Argo sync qualifies as a candidate +// cause and the newest qualifying event wins. +func TestCorrelate_CandidateCausePicksSync(t *testing.T) { + base := time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC) + src := fakeSource{name: "merged", events: []change.ChangeEvent{ + {Time: base, Kind: "sync", Target: "app", After: "deadbeef", Source: "argo"}, + {Time: base.Add(-3 * time.Hour), Kind: "rollout", Target: "app", Source: "k8s"}, + }} + tool := newCorrelateTool(src) + obs, err := runCorrelate(t, tool, map[string]any{"workload": "app", "since": "24h"}) + if err != nil { + t.Fatalf("Run: %v", err) + } + if !strings.Contains(obs.Text, "candidate cause: argo sync") { + t.Errorf("expected candidate cause to be the newest argo sync, got:\n%s", obs.Text) + } +} + +// TestCorrelate_CandidateCauseNone: with only "event" kinds in the window there +// is no state-altering change to blame. +func TestCorrelate_CandidateCauseNone(t *testing.T) { + base := time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC) + src := fakeSource{name: "k8s", events: []change.ChangeEvent{ + {Time: base, Kind: "event", Target: "app", Summary: "Unhealthy", Source: "k8s"}, + }} + tool := newCorrelateTool(src) + obs, err := runCorrelate(t, tool, map[string]any{"workload": "app"}) + if err != nil { + t.Fatalf("Run: %v", err) + } + if !strings.Contains(obs.Text, "candidate cause: none") { + t.Errorf("expected 'candidate cause: none', got:\n%s", obs.Text) + } +} + +// TestCorrelate_PartialFailure: one source errors, the others succeed. The tool +// must NOT error — it returns the working sources' events plus a note. +func TestCorrelate_PartialFailure(t *testing.T) { + base := time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC) + good := fakeSource{name: "docker", events: []change.ChangeEvent{ + {Time: base, Kind: "image", Target: "app", Source: "docker"}, + }} + bad := fakeSource{name: "argo", err: errors.New("argo unreachable")} + tool := newCorrelateTool(bad, good) + obs, err := runCorrelate(t, tool, map[string]any{"workload": "app"}) + if err != nil { + t.Fatalf("partial failure must not error: %v", err) + } + if !strings.Contains(obs.Text, "image") { + t.Errorf("expected the working source's event, got:\n%s", obs.Text) + } + if !strings.Contains(obs.Text, "note:") || !strings.Contains(obs.Text, "argo") { + t.Errorf("expected a failure note naming argo, got:\n%s", obs.Text) + } +} + +// TestCorrelate_AllSourcesFail: every source errors → the tool returns an error +// naming each failed source. +func TestCorrelate_AllSourcesFail(t *testing.T) { + a := fakeSource{name: "k8s", err: errors.New("boom-k8s")} + b := fakeSource{name: "argo", err: errors.New("boom-argo")} + tool := newCorrelateTool(a, b) + _, err := runCorrelate(t, tool, map[string]any{"workload": "app"}) + if err == nil { + t.Fatal("expected an error when all sources fail") + } + if !strings.Contains(err.Error(), "k8s") || !strings.Contains(err.Error(), "argo") { + t.Errorf("error should name both failed sources, got: %v", err) + } +} + +// TestCorrelate_WorkloadRequired: missing workload yields a guidance +// observation, not an error. +func TestCorrelate_WorkloadRequired(t *testing.T) { + tool := newCorrelateTool(fakeSource{name: "docker"}) + obs, err := runCorrelate(t, tool, map[string]any{"limit": 5}) + if err != nil { + t.Fatalf("missing workload should not error: %v", err) + } + if !strings.Contains(obs.Text, "workload is required") { + t.Errorf("expected 'workload is required', got: %s", obs.Text) + } +} + +// TestCorrelate_RiskLow pins the RiskRated contract. +func TestCorrelate_RiskLow(t *testing.T) { + tool := newCorrelateTool(fakeSource{name: "k8s"}) + if got := tools.RiskOf(tool); got != tools.RiskLow { + t.Errorf("RiskOf = %v, want RiskLow", got) + } +} diff --git a/internal/core/tools/gitops/argocd.go b/internal/core/tools/gitops/argocd.go index 4ebd57f..d87e225 100644 --- a/internal/core/tools/gitops/argocd.go +++ b/internal/core/tools/gitops/argocd.go @@ -424,6 +424,19 @@ type ArgoHistoryEntry struct { Source string `json:"source,omitempty"` } +// AppHistory returns app's recent sync history, newest-first, by reading the +// application object's .status.history (the path argo_app_history already uses +// against every Argo CD version). Exported so cross-signal consumers — e.g. +// the correlate group — can fold sync events into a change timeline without +// re-implementing the v1 path layout. +func (c *ArgoClient) AppHistory(ctx context.Context, app string) ([]ArgoHistoryEntry, error) { + body, err := c.RawGet(ctx, "/api/v1/applications/"+url.PathEscape(app), nil) + if err != nil { + return nil, err + } + return parseArgoHistory(body) +} + func parseArgoHistory(body []byte) ([]ArgoHistoryEntry, error) { var it struct { Status struct { diff --git a/internal/wiring/skills_test.go b/internal/wiring/skills_test.go index 1e2ccce..a753542 100644 --- a/internal/wiring/skills_test.go +++ b/internal/wiring/skills_test.go @@ -59,6 +59,8 @@ var canonicalToolNames = []string{ "change.recent", "metric.container_stats", + + "correlate.workload", } // stubTool mirrors the helper in internal/tools/registry_test.go; copied here diff --git a/internal/wiring/tools.go b/internal/wiring/tools.go index d0f0a3b..1be2dc6 100644 --- a/internal/wiring/tools.go +++ b/internal/wiring/tools.go @@ -20,6 +20,7 @@ import ( "github.com/rlaope/cloudy/internal/core/tools" "github.com/rlaope/cloudy/internal/core/tools/alert" "github.com/rlaope/cloudy/internal/core/tools/change" + "github.com/rlaope/cloudy/internal/core/tools/correlate" "github.com/rlaope/cloudy/internal/core/tools/db" "github.com/rlaope/cloudy/internal/core/tools/dockerlog" "github.com/rlaope/cloudy/internal/core/tools/ebpf" @@ -165,6 +166,16 @@ func BuildRegistry(opts Options) (*tools.Registry, error) { reg.UnmarkSkipped("log") } + // correlate.* joins the change timeline (k8s + docker) with Argo CD sync + // history into one evidence chain. Register when at least one of those + // signal sources exists; skip the group only when none do. Reuses hub / + // dockerHub already built above and the Argo clients from the gitops pass. + if hub == nil && dockerHub == nil && len(gitopsClients.Argo) == 0 { + reg.MarkSkipped("correlate", "no kubeconfig, docker hosts, or Argo CD endpoint configured") + } else { + correlate.RegisterAll(reg, hub, dockerHub, gitopsClients.Argo) + } + // Single Profile application point: namespace checker on the Hub plus // tool allow/deny filter on the returned registry. reg = permission.Apply(reg, opts.Profile, func(check func(string) error) {