Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions internal/core/tools/correlate/cause.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package correlate

import (
"fmt"
"strings"
"time"

"github.com/rlaope/cloudy/internal/core/tools/change"
)

// changeKinds is the set of state-altering change event kinds used by
// candidateCauseV2 to distinguish a cause from a symptom or a bare event.
var changeKinds = map[string]bool{
"image": true,
"rollout": true,
"scale": true,
"sync": true,
"container_restart": true,
"container_create": true,
"image_pull": true,
}

// symptomKinds is the set of observable symptom event kinds.
var symptomKinds = map[string]bool{
"log_error": true,
"metric_breach": true,
"trace_error": true,
"trace_slow": true,
}

// candidateCauseV2 aligns a symptom with the change most likely to have caused
// it. events must be newest-first (MergeSorted output).
//
// 1. Find the earliest symptom event (smallest Time among symptomKinds).
// 2. If a symptom exists: find the most recent changeKinds event strictly
// before that symptom.
// - Found → "candidate cause: <change> — preceded symptom <symptom>"
// - Not found → "candidate cause: none — earliest symptom <symptom> has no preceding change found in window"
// 3. No symptom → fall back to the most recent changeKinds event:
// "candidate cause: <change>".
// 4. No qualifying change at all → "candidate cause: none — no state-altering change in the window".
//
// Each <change>/<symptom> is rendered by describeEvent.
func candidateCauseV2(events []change.ChangeEvent) string {
// Find the earliest (oldest) symptom.
var earliestSymptom *change.ChangeEvent
for i := range events {
e := &events[i]
if !symptomKinds[e.Kind] {
continue
}
if earliestSymptom == nil || e.Time.Before(earliestSymptom.Time) {
earliestSymptom = e
}
}

if earliestSymptom != nil {
// events is newest-first, so the first changeKinds entry with
// Time < symptom.Time is the most recent change before the symptom.
for i := range events {
e := &events[i]
if changeKinds[e.Kind] && e.Time.Before(earliestSymptom.Time) {
return fmt.Sprintf("candidate cause: %s — preceded symptom %s",
describeEvent(e), describeEvent(earliestSymptom))
}
}
return fmt.Sprintf(
"candidate cause: none — earliest symptom %s has no preceding change found in window",
describeEvent(earliestSymptom),
)
}

// No symptom: fall back to the most recent change event.
for i := range events {
e := &events[i]
if changeKinds[e.Kind] {
return "candidate cause: " + describeEvent(e)
}
}
return "candidate cause: none — no state-altering change in the window"
}

// describeEvent renders a compact human description of an event:
// "<source> <kind> on <target>", plus its quoted summary and a before→after
// segment when present, suffixed with the UTC RFC3339 timestamp. Empty fields
// are omitted so there are no dangling separators.
func describeEvent(e *change.ChangeEvent) string {
var b strings.Builder
if e.Source != "" {
b.WriteString(e.Source)
b.WriteByte(' ')
}
b.WriteString(e.Kind)
if e.Target != "" {
b.WriteString(" on ")
b.WriteString(e.Target)
}
if e.Summary != "" {
fmt.Fprintf(&b, " %q", e.Summary)
}
if e.Before != "" || e.After != "" {
fmt.Fprintf(&b, " (%s→%s)", e.Before, e.After)
}
fmt.Fprintf(&b, " @ %s", e.Time.UTC().Format(time.RFC3339))
return b.String()
}
108 changes: 108 additions & 0 deletions internal/core/tools/correlate/cause_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package correlate

import (
"strings"
"testing"
"time"

"github.com/rlaope/cloudy/internal/core/tools/change"
)

var (
t0 = time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
t1 = t0.Add(time.Minute)
t2 = t0.Add(2 * time.Minute)
t3 = t0.Add(3 * time.Minute)
)

func evt(kind, summary string, at time.Time) change.ChangeEvent {
return change.ChangeEvent{Kind: kind, Summary: summary, Time: at}
}

// merge wraps MergeSorted with no limit, producing newest-first order.
func merge(events ...change.ChangeEvent) []change.ChangeEvent {
return change.MergeSorted(0, events)
}

func TestCandidateCauseV2_SymptomWithPriorChange(t *testing.T) {
// metric_breach at T2, image at T1, scale at T0 — image is the most
// recent change before the symptom.
events := merge(
evt("metric_breach", "error rate spiked", t2),
evt("image", "deployed v2.1", t1),
evt("scale", "replicas 2→5", t0),
)
got := candidateCauseV2(events)
if !strings.Contains(got, "deployed v2.1") {
t.Fatalf("expected image change summary in output, got: %s", got)
}
if !strings.Contains(got, "image") {
t.Fatalf("expected kind 'image' in output, got: %s", got)
}
if !strings.Contains(got, "error rate spiked") {
t.Fatalf("expected symptom summary in output, got: %s", got)
}
if !strings.HasPrefix(got, "candidate cause:") {
t.Fatalf("expected 'candidate cause:' prefix, got: %s", got)
}
}

func TestCandidateCauseV2_SymptomWithoutPriorChange(t *testing.T) {
// log_error at T2, only change is at T3 (after the symptom).
events := merge(
evt("rollout", "deployed v3", t3),
evt("log_error", "OOM killed", t2),
)
got := candidateCauseV2(events)
if !strings.Contains(got, "no preceding change found") {
t.Fatalf("expected no-preceding-change message, got: %s", got)
}
if !strings.Contains(got, "OOM killed") {
t.Fatalf("expected symptom summary in output, got: %s", got)
}
}

func TestCandidateCauseV2_NoSymptomFallback(t *testing.T) {
// Only change events — falls back to v1: most recent change.
events := merge(
evt("rollout", "rolled out v1.9", t2),
evt("scale", "scaled down", t1),
)
got := candidateCauseV2(events)
if !strings.HasPrefix(got, "candidate cause:") {
t.Fatalf("expected 'candidate cause:' prefix, got: %s", got)
}
if !strings.Contains(got, "rolled out v1.9") {
t.Fatalf("expected most recent change summary, got: %s", got)
}
}

func TestCandidateCauseV2_Empty(t *testing.T) {
got := candidateCauseV2(nil)
if !strings.Contains(got, "candidate cause: none") {
t.Fatalf("expected 'candidate cause: none' for empty input, got: %s", got)
}
}

func TestCandidateCauseV2_EarliestSymptomSelected(t *testing.T) {
// Two symptoms: trace_error at T2, metric_breach at T1 (earlier).
// One change at T0 (before both). Should align with the earliest symptom
// (metric_breach at T1) and the change at T0.
events := merge(
evt("trace_error", "slow span", t2),
evt("metric_breach", "cpu spike", t1),
evt("image", "deployed v5", t0),
)
got := candidateCauseV2(events)
if !strings.Contains(got, "cpu spike") {
t.Fatalf("expected earliest symptom 'cpu spike' in output, got: %s", got)
}
if !strings.Contains(got, "deployed v5") {
t.Fatalf("expected change 'deployed v5' in output, got: %s", got)
}
// The change immediately before the earliest symptom (T0 < T1) is picked,
// not the one before the later symptom.
if strings.Contains(got, "slow span") {
t.Fatalf("should not reference later symptom 'slow span', got: %s", got)
}
}
168 changes: 168 additions & 0 deletions internal/core/tools/correlate/log_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package correlate

import (
"context"
"encoding/json"
"fmt"
"net/url"
"strconv"
"strings"
"time"

dockerclient "github.com/rlaope/cloudy/internal/clients/docker"
"github.com/rlaope/cloudy/internal/core/tools"
"github.com/rlaope/cloudy/internal/core/tools/change"
tlog "github.com/rlaope/cloudy/internal/core/tools/log"
)

// logSource folds log error spikes onto the change timeline as symptoms:
// ChangeEvents whose Kind is "log_error" and Source is "log". It draws from
// Loki backends only — Docker container logs are already exposed by the
// separate log.container tool, so this source is a no-op when no Loki clients
// are configured.
type logSource struct {
logs tlog.Clients
dockerHub *dockerclient.Hub
}

// newLogSource builds a logSource over the configured log backends. It returns
// nil — so callers can omit the source — when neither a Loki client nor a
// Docker hub is available to pull logs from.
func newLogSource(logs tlog.Clients, dockerHub *dockerclient.Hub) change.ChangeSource {
if len(logs.Loki) == 0 && dockerHub == nil {
return nil
}
return &logSource{logs: logs, dockerHub: dockerHub}
}

func (s *logSource) Name() string { return "log" }

// RecentChanges scans recent Loki logs for q.Workload and emits a "log_error"
// symptom event when error-level lines are found in the window. Docker
// container logs are handled by the dedicated log.container tool; this source
// focuses on Loki. If no Loki clients are configured, it returns (nil, nil).
func (s *logSource) RecentChanges(ctx context.Context, q change.ChangeQuery) ([]change.ChangeEvent, error) {
// No Loki clients — nothing to do. Docker logs are out of scope here.
if len(s.logs.Loki) == 0 {
return nil, nil
}

client, err := tools.PickEndpoint(s.logs.Loki, q.Context, "correlate", "loki endpoint")
if err != nil {
return nil, err
}

since := q.Since
if since == 0 {
since = time.Hour
}
now := time.Now()
end := now.Unix()
start := now.Add(-since).Unix()

// LogQL selector: {app="<workload>"} is the canonical k8s-via-Promtail
// label. v2 supports only the `app` label scheme and is namespace-agnostic;
// clusters using a different label (container/pod/service_name) or needing
// namespace scoping should query Loki via the log.* tools directly. %q
// escapes the workload so it cannot break out of the selector string.
logqlQuery := fmt.Sprintf(`{app=%q} |~ "(?i)(error|fatal|panic)"`, q.Workload)

params := url.Values{
"query": {logqlQuery},
"start": {strconv.FormatInt(start*int64(time.Second), 10)},
"end": {strconv.FormatInt(end*int64(time.Second), 10)},
"limit": {"5000"},
}

body, err := client.RawGet(ctx, "/loki/api/v1/query_range", params)
if err != nil {
return nil, fmt.Errorf("log_source: loki query: %w", err)
}

tsLines, err := parseLokiTimestamped(body)
if err != nil {
return nil, fmt.Errorf("log_source: parse: %w", err)
}

events := logErrorEvents(tsLines, q.Workload)
return events, nil
}

// TimestampedLine is a single log line with its nanosecond-precision timestamp
// decoded from the Loki values array. It is exported at package level so the
// pure helper functions are directly testable without a live Loki client.
type TimestampedLine struct {
Time time.Time
Text string
}

// parseLokiTimestamped decodes the Loki query_range envelope and returns lines
// with real time.Time values (parsed from nanosecond Unix strings).
func parseLokiTimestamped(body []byte) ([]TimestampedLine, error) {
var env struct {
Data struct {
Result []struct {
Values [][2]string `json:"values"`
} `json:"result"`
} `json:"data"`
}
if err := json.Unmarshal(body, &env); err != nil {
return nil, err
}
var out []TimestampedLine
for _, s := range env.Data.Result {
for _, v := range s.Values {
ns, err := strconv.ParseInt(v[0], 10, 64)
if err != nil {
continue
}
out = append(out, TimestampedLine{
Time: time.Unix(0, ns),
Text: v[1],
})
}
}
return out, nil
}

// isErrorLine reports whether a log line indicates an error condition.
// The check is case-insensitive; blank lines are never considered errors.
func isErrorLine(line string) bool {
if strings.TrimSpace(line) == "" {
return false
}
lower := strings.ToLower(line)
return strings.Contains(lower, "error") ||
strings.Contains(lower, "fatal") ||
strings.Contains(lower, "panic") ||
strings.Contains(lower, "level=error")
}

// logErrorEvents scans lines for error-level entries and, when at least one is
// found, emits a single ChangeEvent anchored at the earliest error line's
// timestamp. The Summary records the total error-line count in the window.
func logErrorEvents(lines []TimestampedLine, workload string) []change.ChangeEvent {
var earliest time.Time
count := 0
for _, l := range lines {
if !isErrorLine(l.Text) {
continue
}
count++
if earliest.IsZero() || l.Time.Before(earliest) {
earliest = l.Time
}
}
if count == 0 {
return nil
}
return []change.ChangeEvent{
{
Time: earliest,
Kind: "log_error",
Source: "log",
Target: workload,
Summary: fmt.Sprintf("%d error line(s) in window", count),
},
}
}
Loading
Loading