diff --git a/internal/core/tools/correlate/cause.go b/internal/core/tools/correlate/cause.go new file mode 100644 index 0000000..9ec0e52 --- /dev/null +++ b/internal/core/tools/correlate/cause.go @@ -0,0 +1,106 @@ +package correlate + +import ( + "fmt" + "strings" + "time" + + "github.com/rlaope/cloudy/internal/core/tools/change" +) + +// changeKinds is the set of state-altering change event kinds used by +// candidateCauseV2 to distinguish a cause from a symptom or a bare event. +var changeKinds = map[string]bool{ + "image": true, + "rollout": true, + "scale": true, + "sync": true, + "container_restart": true, + "container_create": true, + "image_pull": true, +} + +// symptomKinds is the set of observable symptom event kinds. +var symptomKinds = map[string]bool{ + "log_error": true, + "metric_breach": true, + "trace_error": true, + "trace_slow": true, +} + +// candidateCauseV2 aligns a symptom with the change most likely to have caused +// it. events must be newest-first (MergeSorted output). +// +// 1. Find the earliest symptom event (smallest Time among symptomKinds). +// 2. If a symptom exists: find the most recent changeKinds event strictly +// before that symptom. +// - Found → "candidate cause: — preceded symptom " +// - Not found → "candidate cause: none — earliest symptom has no preceding change found in window" +// 3. No symptom → fall back to the most recent changeKinds event: +// "candidate cause: ". +// 4. No qualifying change at all → "candidate cause: none — no state-altering change in the window". +// +// Each / is rendered by describeEvent. +func candidateCauseV2(events []change.ChangeEvent) string { + // Find the earliest (oldest) symptom. + var earliestSymptom *change.ChangeEvent + for i := range events { + e := &events[i] + if !symptomKinds[e.Kind] { + continue + } + if earliestSymptom == nil || e.Time.Before(earliestSymptom.Time) { + earliestSymptom = e + } + } + + if earliestSymptom != nil { + // events is newest-first, so the first changeKinds entry with + // Time < symptom.Time is the most recent change before the symptom. + for i := range events { + e := &events[i] + if changeKinds[e.Kind] && e.Time.Before(earliestSymptom.Time) { + return fmt.Sprintf("candidate cause: %s — preceded symptom %s", + describeEvent(e), describeEvent(earliestSymptom)) + } + } + return fmt.Sprintf( + "candidate cause: none — earliest symptom %s has no preceding change found in window", + describeEvent(earliestSymptom), + ) + } + + // No symptom: fall back to the most recent change event. + for i := range events { + e := &events[i] + if changeKinds[e.Kind] { + return "candidate cause: " + describeEvent(e) + } + } + return "candidate cause: none — no state-altering change in the window" +} + +// describeEvent renders a compact human description of an event: +// " on ", plus its quoted summary and a before→after +// segment when present, suffixed with the UTC RFC3339 timestamp. Empty fields +// are omitted so there are no dangling separators. +func describeEvent(e *change.ChangeEvent) string { + var b strings.Builder + if e.Source != "" { + b.WriteString(e.Source) + b.WriteByte(' ') + } + b.WriteString(e.Kind) + if e.Target != "" { + b.WriteString(" on ") + b.WriteString(e.Target) + } + if e.Summary != "" { + fmt.Fprintf(&b, " %q", e.Summary) + } + if e.Before != "" || e.After != "" { + fmt.Fprintf(&b, " (%s→%s)", e.Before, e.After) + } + fmt.Fprintf(&b, " @ %s", e.Time.UTC().Format(time.RFC3339)) + return b.String() +} diff --git a/internal/core/tools/correlate/cause_test.go b/internal/core/tools/correlate/cause_test.go new file mode 100644 index 0000000..a87f1a2 --- /dev/null +++ b/internal/core/tools/correlate/cause_test.go @@ -0,0 +1,108 @@ +package correlate + +import ( + "strings" + "testing" + "time" + + "github.com/rlaope/cloudy/internal/core/tools/change" +) + +var ( + t0 = time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + t1 = t0.Add(time.Minute) + t2 = t0.Add(2 * time.Minute) + t3 = t0.Add(3 * time.Minute) +) + +func evt(kind, summary string, at time.Time) change.ChangeEvent { + return change.ChangeEvent{Kind: kind, Summary: summary, Time: at} +} + +// merge wraps MergeSorted with no limit, producing newest-first order. +func merge(events ...change.ChangeEvent) []change.ChangeEvent { + return change.MergeSorted(0, events) +} + +func TestCandidateCauseV2_SymptomWithPriorChange(t *testing.T) { + // metric_breach at T2, image at T1, scale at T0 — image is the most + // recent change before the symptom. + events := merge( + evt("metric_breach", "error rate spiked", t2), + evt("image", "deployed v2.1", t1), + evt("scale", "replicas 2→5", t0), + ) + got := candidateCauseV2(events) + if !strings.Contains(got, "deployed v2.1") { + t.Fatalf("expected image change summary in output, got: %s", got) + } + if !strings.Contains(got, "image") { + t.Fatalf("expected kind 'image' in output, got: %s", got) + } + if !strings.Contains(got, "error rate spiked") { + t.Fatalf("expected symptom summary in output, got: %s", got) + } + if !strings.HasPrefix(got, "candidate cause:") { + t.Fatalf("expected 'candidate cause:' prefix, got: %s", got) + } +} + +func TestCandidateCauseV2_SymptomWithoutPriorChange(t *testing.T) { + // log_error at T2, only change is at T3 (after the symptom). + events := merge( + evt("rollout", "deployed v3", t3), + evt("log_error", "OOM killed", t2), + ) + got := candidateCauseV2(events) + if !strings.Contains(got, "no preceding change found") { + t.Fatalf("expected no-preceding-change message, got: %s", got) + } + if !strings.Contains(got, "OOM killed") { + t.Fatalf("expected symptom summary in output, got: %s", got) + } +} + +func TestCandidateCauseV2_NoSymptomFallback(t *testing.T) { + // Only change events — falls back to v1: most recent change. + events := merge( + evt("rollout", "rolled out v1.9", t2), + evt("scale", "scaled down", t1), + ) + got := candidateCauseV2(events) + if !strings.HasPrefix(got, "candidate cause:") { + t.Fatalf("expected 'candidate cause:' prefix, got: %s", got) + } + if !strings.Contains(got, "rolled out v1.9") { + t.Fatalf("expected most recent change summary, got: %s", got) + } +} + +func TestCandidateCauseV2_Empty(t *testing.T) { + got := candidateCauseV2(nil) + if !strings.Contains(got, "candidate cause: none") { + t.Fatalf("expected 'candidate cause: none' for empty input, got: %s", got) + } +} + +func TestCandidateCauseV2_EarliestSymptomSelected(t *testing.T) { + // Two symptoms: trace_error at T2, metric_breach at T1 (earlier). + // One change at T0 (before both). Should align with the earliest symptom + // (metric_breach at T1) and the change at T0. + events := merge( + evt("trace_error", "slow span", t2), + evt("metric_breach", "cpu spike", t1), + evt("image", "deployed v5", t0), + ) + got := candidateCauseV2(events) + if !strings.Contains(got, "cpu spike") { + t.Fatalf("expected earliest symptom 'cpu spike' in output, got: %s", got) + } + if !strings.Contains(got, "deployed v5") { + t.Fatalf("expected change 'deployed v5' in output, got: %s", got) + } + // The change immediately before the earliest symptom (T0 < T1) is picked, + // not the one before the later symptom. + if strings.Contains(got, "slow span") { + t.Fatalf("should not reference later symptom 'slow span', got: %s", got) + } +} diff --git a/internal/core/tools/correlate/log_source.go b/internal/core/tools/correlate/log_source.go new file mode 100644 index 0000000..daca4f2 --- /dev/null +++ b/internal/core/tools/correlate/log_source.go @@ -0,0 +1,168 @@ +package correlate + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strconv" + "strings" + "time" + + dockerclient "github.com/rlaope/cloudy/internal/clients/docker" + "github.com/rlaope/cloudy/internal/core/tools" + "github.com/rlaope/cloudy/internal/core/tools/change" + tlog "github.com/rlaope/cloudy/internal/core/tools/log" +) + +// logSource folds log error spikes onto the change timeline as symptoms: +// ChangeEvents whose Kind is "log_error" and Source is "log". It draws from +// Loki backends only — Docker container logs are already exposed by the +// separate log.container tool, so this source is a no-op when no Loki clients +// are configured. +type logSource struct { + logs tlog.Clients + dockerHub *dockerclient.Hub +} + +// newLogSource builds a logSource over the configured log backends. It returns +// nil — so callers can omit the source — when neither a Loki client nor a +// Docker hub is available to pull logs from. +func newLogSource(logs tlog.Clients, dockerHub *dockerclient.Hub) change.ChangeSource { + if len(logs.Loki) == 0 && dockerHub == nil { + return nil + } + return &logSource{logs: logs, dockerHub: dockerHub} +} + +func (s *logSource) Name() string { return "log" } + +// RecentChanges scans recent Loki logs for q.Workload and emits a "log_error" +// symptom event when error-level lines are found in the window. Docker +// container logs are handled by the dedicated log.container tool; this source +// focuses on Loki. If no Loki clients are configured, it returns (nil, nil). +func (s *logSource) RecentChanges(ctx context.Context, q change.ChangeQuery) ([]change.ChangeEvent, error) { + // No Loki clients — nothing to do. Docker logs are out of scope here. + if len(s.logs.Loki) == 0 { + return nil, nil + } + + client, err := tools.PickEndpoint(s.logs.Loki, q.Context, "correlate", "loki endpoint") + if err != nil { + return nil, err + } + + since := q.Since + if since == 0 { + since = time.Hour + } + now := time.Now() + end := now.Unix() + start := now.Add(-since).Unix() + + // LogQL selector: {app=""} is the canonical k8s-via-Promtail + // label. v2 supports only the `app` label scheme and is namespace-agnostic; + // clusters using a different label (container/pod/service_name) or needing + // namespace scoping should query Loki via the log.* tools directly. %q + // escapes the workload so it cannot break out of the selector string. + logqlQuery := fmt.Sprintf(`{app=%q} |~ "(?i)(error|fatal|panic)"`, q.Workload) + + params := url.Values{ + "query": {logqlQuery}, + "start": {strconv.FormatInt(start*int64(time.Second), 10)}, + "end": {strconv.FormatInt(end*int64(time.Second), 10)}, + "limit": {"5000"}, + } + + body, err := client.RawGet(ctx, "/loki/api/v1/query_range", params) + if err != nil { + return nil, fmt.Errorf("log_source: loki query: %w", err) + } + + tsLines, err := parseLokiTimestamped(body) + if err != nil { + return nil, fmt.Errorf("log_source: parse: %w", err) + } + + events := logErrorEvents(tsLines, q.Workload) + return events, nil +} + +// TimestampedLine is a single log line with its nanosecond-precision timestamp +// decoded from the Loki values array. It is exported at package level so the +// pure helper functions are directly testable without a live Loki client. +type TimestampedLine struct { + Time time.Time + Text string +} + +// parseLokiTimestamped decodes the Loki query_range envelope and returns lines +// with real time.Time values (parsed from nanosecond Unix strings). +func parseLokiTimestamped(body []byte) ([]TimestampedLine, error) { + var env struct { + Data struct { + Result []struct { + Values [][2]string `json:"values"` + } `json:"result"` + } `json:"data"` + } + if err := json.Unmarshal(body, &env); err != nil { + return nil, err + } + var out []TimestampedLine + for _, s := range env.Data.Result { + for _, v := range s.Values { + ns, err := strconv.ParseInt(v[0], 10, 64) + if err != nil { + continue + } + out = append(out, TimestampedLine{ + Time: time.Unix(0, ns), + Text: v[1], + }) + } + } + return out, nil +} + +// isErrorLine reports whether a log line indicates an error condition. +// The check is case-insensitive; blank lines are never considered errors. +func isErrorLine(line string) bool { + if strings.TrimSpace(line) == "" { + return false + } + lower := strings.ToLower(line) + return strings.Contains(lower, "error") || + strings.Contains(lower, "fatal") || + strings.Contains(lower, "panic") || + strings.Contains(lower, "level=error") +} + +// logErrorEvents scans lines for error-level entries and, when at least one is +// found, emits a single ChangeEvent anchored at the earliest error line's +// timestamp. The Summary records the total error-line count in the window. +func logErrorEvents(lines []TimestampedLine, workload string) []change.ChangeEvent { + var earliest time.Time + count := 0 + for _, l := range lines { + if !isErrorLine(l.Text) { + continue + } + count++ + if earliest.IsZero() || l.Time.Before(earliest) { + earliest = l.Time + } + } + if count == 0 { + return nil + } + return []change.ChangeEvent{ + { + Time: earliest, + Kind: "log_error", + Source: "log", + Target: workload, + Summary: fmt.Sprintf("%d error line(s) in window", count), + }, + } +} diff --git a/internal/core/tools/correlate/log_source_test.go b/internal/core/tools/correlate/log_source_test.go new file mode 100644 index 0000000..70c877b --- /dev/null +++ b/internal/core/tools/correlate/log_source_test.go @@ -0,0 +1,82 @@ +package correlate + +import ( + "testing" + "time" +) + +func TestIsErrorLine(t *testing.T) { + cases := []struct { + line string + want bool + }{ + {"all good", false}, + {"level=error msg=something", true}, + {"", false}, + {"Fatal: out of memory", true}, + {"panic: runtime error", true}, + {"ERROR: connection refused", true}, + {" ", false}, + {"info: starting server", false}, + } + for _, c := range cases { + got := isErrorLine(c.line) + if got != c.want { + t.Errorf("isErrorLine(%q) = %v, want %v", c.line, got, c.want) + } + } +} + +func TestLogErrorEvents_WithErrors(t *testing.T) { + t0 := time.Unix(1000, 0) + t1 := time.Unix(2000, 0) // later + t2 := time.Unix(500, 0) // earliest + + lines := []TimestampedLine{ + {Time: t0, Text: "info: all good"}, + {Time: t1, Text: "ERROR x"}, + {Time: t2, Text: "panic y"}, + {Time: t0, Text: "debug: nothing here"}, + } + + events := logErrorEvents(lines, "myapp") + + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + e := events[0] + if e.Kind != "log_error" { + t.Errorf("Kind = %q, want %q", e.Kind, "log_error") + } + if e.Source != "log" { + t.Errorf("Source = %q, want %q", e.Source, "log") + } + if e.Target != "myapp" { + t.Errorf("Target = %q, want %q", e.Target, "myapp") + } + if !e.Time.Equal(t2) { + t.Errorf("Time = %v, want earliest error time %v", e.Time, t2) + } + // Summary must mention the count (2 error lines: "ERROR x" and "panic y") + if e.Summary != "2 error line(s) in window" { + t.Errorf("Summary = %q, want %q", e.Summary, "2 error line(s) in window") + } +} + +func TestLogErrorEvents_NoErrors(t *testing.T) { + lines := []TimestampedLine{ + {Time: time.Unix(1000, 0), Text: "info: server started"}, + {Time: time.Unix(2000, 0), Text: "debug: request received"}, + } + events := logErrorEvents(lines, "myapp") + if len(events) != 0 { + t.Errorf("expected 0 events, got %d", len(events)) + } +} + +func TestLogErrorEvents_Empty(t *testing.T) { + events := logErrorEvents(nil, "myapp") + if len(events) != 0 { + t.Errorf("expected 0 events for nil input, got %d", len(events)) + } +} diff --git a/internal/core/tools/correlate/metric_source.go b/internal/core/tools/correlate/metric_source.go new file mode 100644 index 0000000..8257715 --- /dev/null +++ b/internal/core/tools/correlate/metric_source.go @@ -0,0 +1,120 @@ +package correlate + +import ( + "context" + "fmt" + "time" + + promclient "github.com/rlaope/cloudy/internal/clients/prom" + "github.com/rlaope/cloudy/internal/core/tools" + "github.com/rlaope/cloudy/internal/core/tools/change" +) + +// metricSource folds a PromQL breach onto the change timeline as a symptom: a +// ChangeEvent whose Kind is "metric_breach" and Source is "metric". The query +// and threshold are per-call args (see correlateTool.Run), so this source is +// built at Run time, not registration. +type metricSource struct { + clients map[string]*promclient.Client + query string + threshold float64 +} + +// newMetricSource builds a metricSource over the configured Prometheus +// clients. It returns nil — so callers can omit the source — when there is no +// query to run or no Prometheus backend is wired. +func newMetricSource(prom map[string]*promclient.Client, query string, threshold float64) change.ChangeSource { + if query == "" || len(prom) == 0 { + return nil + } + return &metricSource{clients: prom, query: query, threshold: threshold} +} + +func (s *metricSource) Name() string { return "metric" } + +// RecentChanges range-queries the PromQL expression over the window derived +// from q.Since and emits a single "metric_breach" ChangeEvent at the earliest +// timestamp where the value exceeds the threshold. An empty q.Context resolves +// to the single configured endpoint (or errors when ambiguous). +func (s *metricSource) RecentChanges(ctx context.Context, q change.ChangeQuery) ([]change.ChangeEvent, error) { + client, err := tools.PickEndpoint(s.clients, q.Context, "correlate", "prometheus endpoint") + if err != nil { + return nil, err + } + + end := time.Now() + window := q.Since + if window <= 0 { + window = time.Hour + } + start := end.Add(-window) + + // Pick a step that yields at most ~200 points; floor at 15 s. + step := window / 200 + if step < 15*time.Second { + step = 15 * time.Second + } + + res, err := client.QueryRange(ctx, s.query, start, end, step) + if err != nil { + return nil, err + } + + return metricBreachEvents(res, s.threshold, s.query), nil +} + +// metricBreachEvents scans a Prometheus matrix result and returns a slice +// containing at most one ChangeEvent — at the earliest timestamp across all +// series where value > threshold. Values[i][0] is a Unix timestamp in seconds +// (float64); it is converted to time.Time via time.Unix with nanosecond +// precision for sub-second steps. +func metricBreachEvents(res *promclient.Result, threshold float64, query string) []change.ChangeEvent { + if res == nil || len(res.Matrix) == 0 { + return nil + } + + var ( + breachTime time.Time + breachValue float64 + peakValue float64 + havePeak bool + found bool + ) + + for _, series := range res.Matrix { + for _, pt := range series.Values { + tsSec := pt[0] + val := pt[1] + t := time.Unix(int64(tsSec), int64((tsSec-float64(int64(tsSec)))*1e9)) + + // Seed the peak from the first sample so a metric whose values are + // all negative still reports a real peak (not a spurious 0). + if !havePeak || val > peakValue { + peakValue = val + havePeak = true + } + if val > threshold { + if !found || t.Before(breachTime) { + breachTime = t + breachValue = val + found = true + } + } + } + } + + if !found { + return nil + } + + return []change.ChangeEvent{ + { + Time: breachTime, + Kind: "metric_breach", + Target: query, + Summary: fmt.Sprintf("metric exceeded threshold %.4g (breach value %.4g, peak %.4g)", threshold, breachValue, peakValue), + After: fmt.Sprintf("%.4g", breachValue), + Source: "metric", + }, + } +} diff --git a/internal/core/tools/correlate/metric_source_test.go b/internal/core/tools/correlate/metric_source_test.go new file mode 100644 index 0000000..7ed76bb --- /dev/null +++ b/internal/core/tools/correlate/metric_source_test.go @@ -0,0 +1,142 @@ +package correlate + +import ( + "testing" + "time" + + promclient "github.com/rlaope/cloudy/internal/clients/prom" +) + +func TestMetricBreachEvents_SingleBreach(t *testing.T) { + // Series crosses threshold at T=1000 (value 5.0 > threshold 4.0). + res := &promclient.Result{ + ResultType: "matrix", + Matrix: []promclient.Series{ + { + Labels: map[string]string{"job": "test"}, + Values: [][2]float64{ + {900, 1.0}, + {950, 3.9}, + {1000, 5.0}, + {1050, 6.0}, + }, + }, + }, + } + + events := metricBreachEvents(res, 4.0, "test_query") + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + e := events[0] + if e.Kind != "metric_breach" { + t.Errorf("kind: got %q, want %q", e.Kind, "metric_breach") + } + if e.Source != "metric" { + t.Errorf("source: got %q, want %q", e.Source, "metric") + } + want := time.Unix(1000, 0) + if !e.Time.Equal(want) { + t.Errorf("time: got %v, want %v", e.Time, want) + } +} + +func TestMetricBreachEvents_EarliestAcrossMultipleSeries(t *testing.T) { + // Series A breaches at T=2000; Series B breaches earlier at T=1500. + // Expect event time == 1500. + res := &promclient.Result{ + ResultType: "matrix", + Matrix: []promclient.Series{ + { + Labels: map[string]string{"instance": "a"}, + Values: [][2]float64{ + {1000, 0.5}, + {2000, 10.0}, + }, + }, + { + Labels: map[string]string{"instance": "b"}, + Values: [][2]float64{ + {1000, 0.1}, + {1500, 7.0}, + {2000, 8.0}, + }, + }, + }, + } + + events := metricBreachEvents(res, 5.0, "multi_series") + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + want := time.Unix(1500, 0) + if !events[0].Time.Equal(want) { + t.Errorf("time: got %v, want %v", events[0].Time, want) + } +} + +func TestMetricBreachEvents_NoBreach(t *testing.T) { + res := &promclient.Result{ + ResultType: "matrix", + Matrix: []promclient.Series{ + { + Labels: map[string]string{"job": "test"}, + Values: [][2]float64{ + {1000, 1.0}, + {2000, 2.0}, + {3000, 3.0}, + }, + }, + }, + } + + events := metricBreachEvents(res, 5.0, "no_breach") + if len(events) != 0 { + t.Errorf("expected 0 events, got %d", len(events)) + } +} + +func TestMetricBreachEvents_EmptyResult(t *testing.T) { + if events := metricBreachEvents(nil, 1.0, "q"); len(events) != 0 { + t.Errorf("nil result: expected 0 events, got %d", len(events)) + } + if events := metricBreachEvents(&promclient.Result{}, 1.0, "q"); len(events) != 0 { + t.Errorf("empty matrix: expected 0 events, got %d", len(events)) + } +} + +func TestMetricBreachEvents_ThresholdZeroSemantics(t *testing.T) { + // value == 0 must NOT breach (strictly >); value > 0 must breach. + res := &promclient.Result{ + ResultType: "matrix", + Matrix: []promclient.Series{ + { + Labels: map[string]string{}, + Values: [][2]float64{ + {100, 0.0}, + {200, 0.001}, + }, + }, + }, + } + + events := metricBreachEvents(res, 0.0, "zero_threshold") + if len(events) != 1 { + t.Fatalf("expected 1 event (value 0.001 > 0), got %d", len(events)) + } + want := time.Unix(200, 0) + if !events[0].Time.Equal(want) { + t.Errorf("time: got %v, want %v", events[0].Time, want) + } + + // Confirm value exactly == 0 yields no breach. + resZero := &promclient.Result{ + ResultType: "matrix", + Matrix: []promclient.Series{ + {Labels: map[string]string{}, Values: [][2]float64{{100, 0.0}}}, + }, + } + if ev := metricBreachEvents(resZero, 0.0, "zero_value"); len(ev) != 0 { + t.Errorf("value==0 with threshold==0: expected no breach, got %d events", len(ev)) + } +} diff --git a/internal/core/tools/correlate/register.go b/internal/core/tools/correlate/register.go index a65da0d..7a56eeb 100644 --- a/internal/core/tools/correlate/register.go +++ b/internal/core/tools/correlate/register.go @@ -3,17 +3,22 @@ package correlate import ( dockerclient "github.com/rlaope/cloudy/internal/clients/docker" k8sclient "github.com/rlaope/cloudy/internal/clients/k8s" + promclient "github.com/rlaope/cloudy/internal/clients/prom" "github.com/rlaope/cloudy/internal/core/tools" "github.com/rlaope/cloudy/internal/core/tools/change" "github.com/rlaope/cloudy/internal/core/tools/gitops" + tlog "github.com/rlaope/cloudy/internal/core/tools/log" + "github.com/rlaope/cloudy/internal/core/tools/trace" ) -// RegisterAll adds the correlate.workload tool to reg, bound to whichever -// signal sources are available: a k8s change source when k8sHub is non-nil, a -// docker change source when dockerHub is non-nil, and an Argo CD sync source -// when at least one Argo client is wired. With no source available this is a -// no-op — the wiring layer marks the "correlate" group skipped instead. -func RegisterAll(reg *tools.Registry, k8sHub *k8sclient.Hub, dockerHub *dockerclient.Hub, argo map[string]*gitops.ArgoClient) { +// RegisterAll adds the correlate.workload tool to reg. The change sources are +// fixed at registration: a k8s change source when k8sHub is non-nil, a docker +// change source when dockerHub is non-nil, and an Argo CD sync source when at +// least one Argo client is wired. The symptom backends (prom/logs/traces) are +// threaded through so the tool can build metric/log/trace symptom sources from +// per-call args at Run time. With no change source AND no symptom backend this +// is a no-op — the wiring layer marks the "correlate" group skipped instead. +func RegisterAll(reg *tools.Registry, k8sHub *k8sclient.Hub, dockerHub *dockerclient.Hub, argo map[string]*gitops.ArgoClient, prom map[string]*promclient.Client, logs tlog.Clients, traces trace.Clients) { var sources []change.ChangeSource if k8sHub != nil { sources = append(sources, change.NewK8sSource(k8sHub)) @@ -24,8 +29,11 @@ func RegisterAll(reg *tools.Registry, k8sHub *k8sclient.Hub, dockerHub *dockercl if src := newArgoSource(argo); src != nil { sources = append(sources, src) } - if len(sources) == 0 { + // Register when any change source OR any symptom backend exists; a + // symptom-only setup (e.g. prom + loki, no k8s/docker/argo) is still valid. + hasSymptom := len(prom) > 0 || len(logs.Loki) > 0 || len(traces.Jaeger) > 0 + if len(sources) == 0 && !hasSymptom { return } - reg.MustRegister(newCorrelateTool(sources...)) + reg.MustRegister(NewWorkloadTool(sources, prom, logs, traces, dockerHub)) } diff --git a/internal/core/tools/correlate/tool.go b/internal/core/tools/correlate/tool.go index ddd88f1..f85fc8f 100644 --- a/internal/core/tools/correlate/tool.go +++ b/internal/core/tools/correlate/tool.go @@ -7,8 +7,12 @@ import ( "strings" "time" + dockerclient "github.com/rlaope/cloudy/internal/clients/docker" + promclient "github.com/rlaope/cloudy/internal/clients/prom" "github.com/rlaope/cloudy/internal/core/tools" "github.com/rlaope/cloudy/internal/core/tools/change" + tlog "github.com/rlaope/cloudy/internal/core/tools/log" + "github.com/rlaope/cloudy/internal/core/tools/trace" ) // defaultSince is used when the caller omits `since` or supplies a value that @@ -18,46 +22,58 @@ const defaultSince = time.Hour // defaultLimit caps the merged timeline when the caller omits `limit`. const defaultLimit = 50 -// causeKinds is the set of state-altering change kinds that can plausibly be -// the cause behind a symptom. "event" is excluded: a Kubernetes event is a -// report of a condition, not the change that produced it. -var causeKinds = map[string]bool{ - "image": true, - "rollout": true, - "scale": true, - "sync": true, - "container_restart": true, - "container_create": true, - "image_pull": true, -} - type correlateArgs struct { - Workload string `json:"workload"` - Namespace string `json:"namespace"` - Context string `json:"context"` - Since string `json:"since"` - Limit int `json:"limit"` + Workload string `json:"workload"` + Namespace string `json:"namespace"` + Context string `json:"context"` + Since string `json:"since"` + Limit int `json:"limit"` + MetricQuery string `json:"metric_query"` + MetricThreshold float64 `json:"metric_threshold"` } -// correlateTool joins the available change sources (k8s, docker, argo) into one -// newest-first evidence timeline for a workload and names the most recent -// state-altering event as the likeliest correlate of a current symptom. It is -// hand-written rather than built via Spec[Args] so it can advertise RiskLow -// through the RiskRated interface. +// correlateTool joins the available change sources (k8s, docker, argo) and the +// symptom sources (metric/log/trace) into one newest-first evidence timeline +// for a workload, and names the most recent state-altering event as the +// likeliest correlate of a current symptom. It is hand-written rather than +// built via Spec[Args] so it can advertise RiskLow through the RiskRated +// interface. +// +// changeSources are fixed at registration; symptom sources are built per-Run +// because the metric query/threshold arrive as call args. type correlateTool struct { - sources []change.ChangeSource + changeSources []change.ChangeSource + prom map[string]*promclient.Client + logs tlog.Clients + traces trace.Clients + dockerHub *dockerclient.Hub +} + +// NewWorkloadTool returns the correlate.workload tool. changeSources are the +// registration-time sources (k8s/docker/argo); the remaining deps let Run build +// the metric/log/trace symptom sources from per-call args. Any dep may be +// zero/nil — the matching symptom source is simply omitted. +func NewWorkloadTool(changeSources []change.ChangeSource, prom map[string]*promclient.Client, logs tlog.Clients, traces trace.Clients, dockerHub *dockerclient.Hub) tools.Tool { + return &correlateTool{ + changeSources: changeSources, + prom: prom, + logs: logs, + traces: traces, + dockerHub: dockerHub, + } } -// newCorrelateTool returns the correlate.workload tool bound to the supplied -// sources. At least one source is expected. +// newCorrelateTool returns the correlate.workload tool bound only to the +// supplied change sources, with no symptom backends. Used in tests; production +// wiring goes through NewWorkloadTool. func newCorrelateTool(sources ...change.ChangeSource) tools.Tool { - return &correlateTool{sources: sources} + return &correlateTool{changeSources: sources} } func (t *correlateTool) Name() string { return "correlate.workload" } func (t *correlateTool) Description() string { - return "Correlate recent changes for a workload across signals — Kubernetes/Docker change history and Argo CD sync history — into one newest-first evidence timeline, and name the most recent state-altering event as the likeliest cause of a current symptom. Read-only." + return "Correlate recent changes and symptoms for a workload across signals — Kubernetes/Docker change history, Argo CD sync history, plus metric/log/trace symptoms — into one newest-first evidence timeline, and name the most recent state-altering event as the likeliest cause of a current symptom. Supply metric_query (PromQL) with metric_threshold to fold a metric breach onto the timeline; log and trace symptoms are added automatically when those backends are configured. Read-only." } func (t *correlateTool) Schema() json.RawMessage { @@ -65,11 +81,13 @@ func (t *correlateTool) Schema() json.RawMessage { s := map[string]any{ "type": "object", "properties": map[string]any{ - "workload": str("Workload to correlate (deployment/statefulset/daemonset, container/compose service, or Argo CD application name). Required."), - "namespace": str("Kubernetes namespace to scope the search; ignored by the Docker and Argo sources."), - "context": str("kubeconfig context, Docker host, or Argo CD endpoint name to query; empty = each backend's default."), - "since": str("How far back to look, as a Go duration (e.g. \"1h\", \"90m\"); default \"1h\"."), - "limit": map[string]any{"type": "integer", "description": "Maximum number of timeline events to return; default 50."}, + "workload": str("Workload to correlate (deployment/statefulset/daemonset, container/compose service, or Argo CD application name). Required."), + "namespace": str("Kubernetes namespace to scope the change search; ignored by the Docker and Argo change sources and by the metric/log/trace symptom sources (namespace-agnostic in v2 — scope symptoms via metric_query or workload/service naming)."), + "context": str("kubeconfig context, Docker host, or Argo CD endpoint name to query; empty = each backend's default."), + "since": str("How far back to look, as a Go duration (e.g. \"1h\", \"90m\"); default \"1h\"."), + "limit": map[string]any{"type": "integer", "description": "Maximum number of timeline events to return; default 50."}, + "metric_query": str("PromQL query whose breaches are folded onto the timeline as metric symptom events; empty = no metric symptom."), + "metric_threshold": map[string]any{"type": "number", "description": "Value a metric_query sample must exceed to count as a breach; default 0 (i.e. value > 0)."}, }, "required": []string{"workload"}, } @@ -106,7 +124,22 @@ func (t *correlateTool) Run(ctx context.Context, raw json.RawMessage) (tools.Obs limit = a.Limit } - if len(t.sources) == 0 { + // Symptom sources are built per-call because the metric query/threshold are + // call args. nil constructors (absent backend/inputs) are skipped, then the + // rest join the fixed change sources on one timeline. Copy first so appends + // never mutate the shared t.changeSources backing array across calls. + sources := append([]change.ChangeSource(nil), t.changeSources...) + if src := newMetricSource(t.prom, a.MetricQuery, a.MetricThreshold); src != nil { + sources = append(sources, src) + } + if src := newLogSource(t.logs, t.dockerHub); src != nil { + sources = append(sources, src) + } + if src := newTraceSource(t.traces); src != nil { + sources = append(sources, src) + } + + if len(sources) == 0 { return tools.Observation{Text: "correlate.workload: no change sources available"}, nil } @@ -120,7 +153,7 @@ func (t *correlateTool) Run(ctx context.Context, raw json.RawMessage) (tools.Obs var groups [][]change.ChangeEvent var failures []string - for _, src := range t.sources { + for _, src := range sources { events, err := src.RecentChanges(ctx, q) if err != nil { failures = append(failures, fmt.Sprintf("%s: %v", src.Name(), err)) @@ -141,18 +174,6 @@ func (t *correlateTool) Run(ctx context.Context, raw json.RawMessage) (tools.Obs }, nil } -// candidateCause returns the newest event whose Kind is state-altering (in -// causeKinds), i.e. the likeliest correlate of a current symptom. events must -// already be newest-first (MergeSorted output). Returns nil when none qualify. -func candidateCause(events []change.ChangeEvent) *change.ChangeEvent { - for i := range events { - if causeKinds[events[i].Kind] { - return &events[i] - } - } - return nil -} - // renderCorrelation formats the evidence timeline newest-first followed by the // candidate-cause line. Each timeline line is // "RFC3339 | source | kind | target | summary | before→after"; the before→after @@ -169,16 +190,8 @@ func renderCorrelation(workload string, since time.Duration, events []change.Cha } b.WriteByte('\n') } - if cause := candidateCause(events); cause != nil { - fmt.Fprintf(&b, "candidate cause: %s %s on %s @ %s", - cause.Source, cause.Kind, cause.Target, cause.Time.UTC().Format(time.RFC3339)) - if cause.Before != "" || cause.After != "" { - fmt.Fprintf(&b, " (%s→%s)", cause.Before, cause.After) - } - b.WriteByte('\n') - } else { - b.WriteString("candidate cause: none — no state-altering change in the window\n") - } + b.WriteString(candidateCauseV2(events)) + b.WriteByte('\n') if len(failures) > 0 { fmt.Fprintf(&b, "note: %d source(s) failed: %s\n", len(failures), strings.Join(failures, "; ")) } diff --git a/internal/core/tools/correlate/tool_test.go b/internal/core/tools/correlate/tool_test.go index 8c719cb..46dc71c 100644 --- a/internal/core/tools/correlate/tool_test.go +++ b/internal/core/tools/correlate/tool_test.go @@ -59,6 +59,32 @@ func TestCorrelate_MergesNewestFirst(t *testing.T) { } } +// TestCorrelate_SymptomRendersAndAligns: a symptom-kind event from a symptom +// source renders on the unified timeline, and candidate-cause v2 names the +// change that preceded the symptom (not merely the newest change). +func TestCorrelate_SymptomRendersAndAligns(t *testing.T) { + base := time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC) + changeSrc := fakeSource{name: "k8s", events: []change.ChangeEvent{ + {Time: base.Add(-30 * time.Minute), Kind: "image", Target: "app", Before: "v1", After: "v2", Source: "k8s"}, + }} + symptomSrc := fakeSource{name: "metric", events: []change.ChangeEvent{ + {Time: base, Kind: "metric_breach", Target: "app", Summary: "error rate > 0.2", Source: "metric"}, + }} + tool := newCorrelateTool(changeSrc, symptomSrc) + obs, err := runCorrelate(t, tool, map[string]any{"workload": "app", "since": "24h"}) + if err != nil { + t.Fatalf("Run: %v", err) + } + // Symptom line is visible on the timeline. + if !strings.Contains(obs.Text, "metric_breach") || !strings.Contains(obs.Text, "error rate > 0.2") { + t.Errorf("expected metric_breach symptom line on the timeline, got:\n%s", obs.Text) + } + // Candidate cause aligns the image change before the symptom. + if !strings.Contains(obs.Text, "candidate cause:") || !strings.Contains(obs.Text, "image") || !strings.Contains(obs.Text, "preceded symptom") { + t.Errorf("expected candidate cause aligning the image change before the symptom, got:\n%s", obs.Text) + } +} + // TestCorrelate_CandidateCauseSkipsEvent: the candidate cause is the newest // state-altering event, skipping "event" kinds even when they are newer. func TestCorrelate_CandidateCauseSkipsEvent(t *testing.T) { diff --git a/internal/core/tools/correlate/trace_source.go b/internal/core/tools/correlate/trace_source.go new file mode 100644 index 0000000..1b3f141 --- /dev/null +++ b/internal/core/tools/correlate/trace_source.go @@ -0,0 +1,124 @@ +package correlate + +import ( + "context" + "fmt" + "time" + + "github.com/rlaope/cloudy/internal/core/tools" + "github.com/rlaope/cloudy/internal/core/tools/change" + "github.com/rlaope/cloudy/internal/core/tools/trace" +) + +// traceSlowThreshold is the hardcoded latency above which a non-error span is +// treated as a "trace_slow" symptom. 1s is a sane default for a request span; +// it is intentionally not configurable in v2. +const traceSlowThreshold = time.Second + +// traceSource folds trace errors and latency outliers onto the change timeline +// as symptoms: ChangeEvents whose Kind is "trace_error" / "trace_slow" and +// Source is "trace". It draws from the Jaeger backends. +// +// v2 covers Jaeger only. Tempo is deferred to v3: its search response is opaque +// (raw, schema-light) and would need its own span extraction, so a Tempo-only +// deployment yields no trace symptoms here (newTraceSource returns nil). +type traceSource struct { + traces trace.Clients +} + +// newTraceSource builds a traceSource over the configured tracing backends. It +// returns nil — so callers can omit the source — when no Jaeger client is wired +// (Tempo-only deployments are deferred to v3; see the type doc). +func newTraceSource(traces trace.Clients) change.ChangeSource { + if len(traces.Jaeger) == 0 { + return nil + } + return &traceSource{traces: traces} +} + +func (s *traceSource) Name() string { return "trace" } + +// RecentChanges searches recent Jaeger traces for q.Workload (used as the +// service name) over the window [now-Since, now] (default 1h), filtered to +// error spans via the `error=true` tag, and emits trace symptom events. An +// empty q.Context resolves to the single configured Jaeger endpoint (or errors +// when ambiguous). Per-source errors are returned for the caller to tolerate. +func (s *traceSource) RecentChanges(ctx context.Context, q change.ChangeQuery) ([]change.ChangeEvent, error) { + if len(s.traces.Jaeger) == 0 { + return nil, nil + } + client, err := tools.PickEndpoint(s.traces.Jaeger, q.Context, "correlate", "jaeger endpoint") + if err != nil { + return nil, err + } + + end := time.Now() + window := q.Since + if window <= 0 { + window = time.Hour + } + start := end.Add(-window) + + spans, err := client.SearchErrorSpans(ctx, q.Workload, `{"error":"true"}`, start, end, 100) + if err != nil { + return nil, err + } + return traceSymptomEvents(spans, q.Workload), nil +} + +// traceSymptomEvents converts Jaeger spans into trace symptom events. It is +// pure (no I/O) so it can be unit-tested with literal span slices. +// +// To avoid flooding the timeline it mirrors the metric/log "earliest onset" +// approach: it emits at most ONE "trace_error" event, at the earliest error +// span's start, carrying the total error-span count in its Summary. It also +// emits at most ONE "trace_slow" event for the earliest non-error span whose +// Duration exceeds traceSlowThreshold. All events use Source "trace" and Target +// workload. +func traceSymptomEvents(spans []trace.JaegerSpan, workload string) []change.ChangeEvent { + var ( + errCount int + earliestErr trace.JaegerSpan + haveErr bool + earliestSlow trace.JaegerSpan + haveSlow bool + ) + + for _, sp := range spans { + if sp.Error { + errCount++ + if !haveErr || sp.StartTime.Before(earliestErr.StartTime) { + earliestErr = sp + haveErr = true + } + continue + } + if sp.Duration > traceSlowThreshold { + if !haveSlow || sp.StartTime.Before(earliestSlow.StartTime) { + earliestSlow = sp + haveSlow = true + } + } + } + + var out []change.ChangeEvent + if haveErr { + out = append(out, change.ChangeEvent{ + Time: earliestErr.StartTime, + Kind: "trace_error", + Target: workload, + Summary: fmt.Sprintf("%s failed (%dms); %d error span(s) in window", earliestErr.Operation, earliestErr.Duration.Milliseconds(), errCount), + Source: "trace", + }) + } + if haveSlow { + out = append(out, change.ChangeEvent{ + Time: earliestSlow.StartTime, + Kind: "trace_slow", + Target: workload, + Summary: fmt.Sprintf("%s slow (%dms > %dms threshold)", earliestSlow.Operation, earliestSlow.Duration.Milliseconds(), traceSlowThreshold.Milliseconds()), + Source: "trace", + }) + } + return out +} diff --git a/internal/core/tools/correlate/trace_source_test.go b/internal/core/tools/correlate/trace_source_test.go new file mode 100644 index 0000000..db55a1a --- /dev/null +++ b/internal/core/tools/correlate/trace_source_test.go @@ -0,0 +1,81 @@ +package correlate + +import ( + "strings" + "testing" + "time" + + "github.com/rlaope/cloudy/internal/core/tools/trace" +) + +func TestTraceSymptomEvents(t *testing.T) { + base := time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC) + + t.Run("two error spans emit one trace_error at the earliest start", func(t *testing.T) { + spans := []trace.JaegerSpan{ + {StartTime: base.Add(30 * time.Second), Duration: 200 * time.Millisecond, Error: true, Operation: "POST /pay"}, + {StartTime: base.Add(5 * time.Second), Duration: 150 * time.Millisecond, Error: true, Operation: "POST /pay"}, + } + got := traceSymptomEvents(spans, "checkout") + if len(got) != 1 { + t.Fatalf("len = %d, want 1", len(got)) + } + e := got[0] + if e.Kind != "trace_error" { + t.Errorf("Kind = %q, want trace_error", e.Kind) + } + if !e.Time.Equal(base.Add(5 * time.Second)) { + t.Errorf("Time = %v, want earliest error span start %v", e.Time, base.Add(5*time.Second)) + } + if e.Target != "checkout" { + t.Errorf("Target = %q, want checkout", e.Target) + } + if e.Source != "trace" { + t.Errorf("Source = %q, want trace", e.Source) + } + if !strings.Contains(e.Summary, "2 error span") { + t.Errorf("Summary = %q, want it to report count=2", e.Summary) + } + }) + + t.Run("no error spans emit zero events", func(t *testing.T) { + spans := []trace.JaegerSpan{ + {StartTime: base, Duration: 50 * time.Millisecond, Error: false, Operation: "GET /health"}, + } + if got := traceSymptomEvents(spans, "checkout"); len(got) != 0 { + t.Fatalf("len = %d, want 0 (no error or slow spans)", len(got)) + } + }) + + t.Run("slow non-error span emits one trace_slow event", func(t *testing.T) { + spans := []trace.JaegerSpan{ + {StartTime: base, Duration: 2 * time.Second, Error: false, Operation: "GET /report"}, + } + got := traceSymptomEvents(spans, "checkout") + if len(got) != 1 { + t.Fatalf("len = %d, want 1", len(got)) + } + e := got[0] + if e.Kind != "trace_slow" { + t.Errorf("Kind = %q, want trace_slow", e.Kind) + } + if !e.Time.Equal(base) { + t.Errorf("Time = %v, want %v", e.Time, base) + } + if e.Source != "trace" { + t.Errorf("Source = %q, want trace", e.Source) + } + }) +} + +func TestNewTraceSource(t *testing.T) { + if newTraceSource(trace.Clients{}) != nil { + t.Error("newTraceSource with no clients should be nil") + } + if newTraceSource(trace.Clients{Tempo: map[string]*trace.TempoClient{"t": {}}}) != nil { + t.Error("newTraceSource with only Tempo should be nil (v3 deferred)") + } + if newTraceSource(trace.Clients{Jaeger: map[string]*trace.JaegerClient{"j": {}}}) == nil { + t.Error("newTraceSource with a Jaeger client should be non-nil") + } +} diff --git a/internal/core/tools/trace/jaeger.go b/internal/core/tools/trace/jaeger.go index 06a8dcc..c94c510 100644 --- a/internal/core/tools/trace/jaeger.go +++ b/internal/core/tools/trace/jaeger.go @@ -185,6 +185,89 @@ func newJaegerSearchTracesTool(clients map[string]*JaegerClient) tools.Tool { }.Build() } +// JaegerSpan is the minimal, read-only view of a single Jaeger span needed by +// correlation: when it started, how long it ran, whether it failed, and the +// operation that produced it. The existing parseJaegerSearch summary drops +// startTime and error tags, so this struct surfaces them for callers that fold +// traces onto a change timeline (see internal/core/tools/correlate). +type JaegerSpan struct { + StartTime time.Time + Duration time.Duration + Error bool + Operation string +} + +// SearchErrorSpans queries Jaeger's /api/traces for service over [start, end] +// and returns one JaegerSpan per matched span, decoding the raw startTime +// (microseconds since epoch) and duration (microseconds), plus an Error flag +// derived from the span's `error=true` tag. tags, when non-empty, is passed as +// the Jaeger tags filter (e.g. `{"error":"true"}`); limit caps matched traces. +// This is a list/get read only — it mutates nothing. +func (c *JaegerClient) SearchErrorSpans(ctx context.Context, service, tags string, start, end time.Time, limit int) ([]JaegerSpan, error) { + if limit <= 0 { + limit = 20 + } + params := url.Values{ + "service": {service}, + // Jaeger expects microseconds for start/end. + "start": {strconv.FormatInt(start.UnixMicro(), 10)}, + "end": {strconv.FormatInt(end.UnixMicro(), 10)}, + "limit": {strconv.Itoa(limit)}, + } + if tags != "" { + params.Set("tags", tags) + } + body, err := c.RawGet(ctx, "/api/traces", params) + if err != nil { + return nil, fmt.Errorf("trace.jaeger search spans: %w", err) + } + return parseJaegerSpans(body) +} + +// parseJaegerSpans flattens a Jaeger /api/traces response into JaegerSpans, +// reading the raw startTime + duration (both microseconds) and the `error` tag. +func parseJaegerSpans(body []byte) ([]JaegerSpan, error) { + var env struct { + Data []struct { + Spans []struct { + OperationName string `json:"operationName"` + StartTime int64 `json:"startTime"` + Duration int64 `json:"duration"` + Tags []struct { + Key string `json:"key"` + Value any `json:"value"` + } `json:"tags"` + } `json:"spans"` + } `json:"data"` + } + if err := json.Unmarshal(body, &env); err != nil { + return nil, err + } + var out []JaegerSpan + for _, t := range env.Data { + for _, sp := range t.Spans { + isErr := false + for _, tag := range sp.Tags { + if tag.Key == "error" { + switch v := tag.Value.(type) { + case bool: + isErr = v + case string: + isErr = v == "true" + } + } + } + out = append(out, JaegerSpan{ + StartTime: time.UnixMicro(sp.StartTime), + Duration: time.Duration(sp.Duration) * time.Microsecond, + Error: isErr, + Operation: sp.OperationName, + }) + } + } + return out, nil +} + // parseJaegerSearch produces a one-line summary per matched trace // (traceID, span count, duration µs, root service/operation). func parseJaegerSearch(body []byte) (string, *render.Table, any, error) { diff --git a/internal/wiring/tools.go b/internal/wiring/tools.go index 1be2dc6..15d820a 100644 --- a/internal/wiring/tools.go +++ b/internal/wiring/tools.go @@ -166,14 +166,17 @@ func BuildRegistry(opts Options) (*tools.Registry, error) { reg.UnmarkSkipped("log") } - // correlate.* joins the change timeline (k8s + docker) with Argo CD sync - // history into one evidence chain. Register when at least one of those - // signal sources exists; skip the group only when none do. Reuses hub / - // dockerHub already built above and the Argo clients from the gitops pass. - if hub == nil && dockerHub == nil && len(gitopsClients.Argo) == 0 { - reg.MarkSkipped("correlate", "no kubeconfig, docker hosts, or Argo CD endpoint configured") + // correlate.* joins the change timeline (k8s + docker + Argo CD sync) with + // metric/log/trace symptom signals into one evidence chain. Register when + // ANY signal source exists — change backends (k8s/docker/argo) or symptom + // backends (prom/loki/jaeger), since symptom-only setups are valid; skip the + // group only when none do. Reuses hub / dockerHub / Argo / prom / log / + // trace clients already built above. + if hub == nil && dockerHub == nil && len(gitopsClients.Argo) == 0 && + len(promClients) == 0 && len(logClients.Loki) == 0 && len(traceClients.Jaeger) == 0 { + reg.MarkSkipped("correlate", "no kubeconfig, docker hosts, Argo CD, Prometheus, Loki, or Jaeger endpoint configured") } else { - correlate.RegisterAll(reg, hub, dockerHub, gitopsClients.Argo) + correlate.RegisterAll(reg, hub, dockerHub, gitopsClients.Argo, promClients, logClients, traceClients) } // Single Profile application point: namespace checker on the Hub plus